Get current consumer_group offset

This commit is contained in:
2022-02-17 23:37:36 +01:00
parent 3444e7276e
commit 7db418f235
3 changed files with 48 additions and 14 deletions

View File

@@ -0,0 +1,26 @@
defmodule ConsumerOffsetsGenConsumer do
use KafkaEx.GenConsumer
alias KafkaEx.Protocol.Fetch.Message
require Logger
def handle_message_set(message_set, state) do
for %Message{key: key, offset: offset} <- message_set do
Logger.info(fn -> "offset: " <> inspect(offset) end)
consumer_group = get_consumer_group(key)
end
{:async_commit, state}
end
defp get_consumer_group(<<prefix, version, postfix::binary-size(2), consumer_group::binary>>) do
Logger.debug(fn -> "prefix: " <> inspect(prefix) end)
Logger.debug(fn -> "version: " <> inspect(version) end)
Logger.debug(fn -> "postfix: " <> inspect(postfix) end)
consumer_group
end
end

View File

@@ -1,18 +1,27 @@
defmodule KafkaexLagExporter do
@moduledoc """
Documentation for `KafkaexLagExporter`.
"""
use Application
@doc """
Hello world.
def start(_type, _args) do
import Supervisor.Spec
## Examples
consumer_group_opts = [
# setting for the ConsumerGroup
heartbeat_interval: 1_000,
# this setting will be forwarded to the GenConsumer
commit_interval: 1_000
]
iex> KafkaexLagExporter.hello()
:world
gen_consumer_impl = ConsumerOffsetsGenConsumer
consumer_group_name = "offsets_group"
topic_names = ["__consumer_offsets"]
"""
def hello do
:world
children = [
supervisor(
KafkaEx.ConsumerGroup,
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
)
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end