Request group description
This commit is contained in:
@@ -29,10 +29,20 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
|||||||
@impl true
|
@impl true
|
||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
consumer_groups = :brod.list_all_groups(state.endpoints, [])
|
consumer_groups = :brod.list_all_groups(state.endpoints, [])
|
||||||
Logger.info("Consumer groups state: #{inspect(consumer_groups)}")
|
|
||||||
|
Enum.each(consumer_groups, fn {broker_info, group_name} ->
|
||||||
|
describe_group(broker_info, group_name)
|
||||||
|
end)
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, @interval)
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp describe_group(_, []), do: nil
|
||||||
|
|
||||||
|
defp describe_group(broker_info, group_name) do
|
||||||
|
Logger.info("Getting info for group name: #{group_name}")
|
||||||
|
:brod.describe_groups(broker_info, [], group_name)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user