Return more data for members
This commit is contained in:
@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
|
||||
|
||||
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
|
||||
|
||||
@callback topic_names_for_consumer_groups(
|
||||
@callback get_consumer_group_info(
|
||||
{host :: atom, port :: non_neg_integer},
|
||||
list(binary),
|
||||
list(binary)
|
||||
@@ -35,8 +35,8 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
|
||||
|
||||
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
|
||||
|
||||
def topic_names_for_consumer_groups(endpoint, list, consumer_group_names),
|
||||
do: impl().topic_names_for_consumer_groups(endpoint, list, consumer_group_names)
|
||||
def get_consumer_group_info(endpoint, list, consumer_group_names),
|
||||
do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
|
||||
|
||||
defp impl,
|
||||
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)
|
||||
|
||||
@@ -82,25 +82,35 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
|
||||
|
||||
groups
|
||||
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end)
|
||||
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end)
|
||||
|> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
|
||||
|> Enum.map(fn {_, group_name, _} -> group_name end)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do
|
||||
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|
||||
|> get_topic_names_for_consumer_groups
|
||||
end
|
||||
def get_consumer_group_info(endpoint, list \\ [], consumer_group_names) do
|
||||
{:ok, group_descriptions} = KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|
||||
|
||||
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
|
||||
group_descriptions
|
||||
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end)
|
||||
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end)
|
||||
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
||||
get_member_info(members)
|
||||
|> Enum.map(fn {topic_names, consumer_id, member_host} ->
|
||||
{consumer_group, topic_names, consumer_id, member_host}
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
defp get_topic_names(members) do
|
||||
Enum.flat_map(members, fn member ->
|
||||
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment)
|
||||
@spec get_member_info(
|
||||
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
||||
) ::
|
||||
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
|
||||
defp get_member_info(members) do
|
||||
Enum.map(members, fn %{
|
||||
client_host: member_host,
|
||||
member_assignment: member_assignment,
|
||||
member_id: consumer_id
|
||||
} ->
|
||||
topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||
{topic_names, consumer_id, member_host}
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user