diff --git a/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex b/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex index 4988f78..bafa2b7 100644 --- a/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex +++ b/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex @@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do @callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary) - @callback topic_names_for_consumer_groups( + @callback get_consumer_group_info( {host :: atom, port :: non_neg_integer}, list(binary), list(binary) @@ -35,8 +35,8 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port}) - def topic_names_for_consumer_groups(endpoint, list, consumer_group_names), - do: impl().topic_names_for_consumer_groups(endpoint, list, consumer_group_names) + def get_consumer_group_info(endpoint, list, consumer_group_names), + do: impl().get_consumer_group_info(endpoint, list, consumer_group_names) defp impl, do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils) diff --git a/lib/kafkaex_lag_exporter/kafka_utils.ex b/lib/kafkaex_lag_exporter/kafka_utils.ex index cc891c7..fed79ed 100644 --- a/lib/kafkaex_lag_exporter/kafka_utils.ex +++ b/lib/kafkaex_lag_exporter/kafka_utils.ex @@ -82,25 +82,35 @@ defmodule KafkaexLagExporter.KafkaUtils do [{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], []) groups - |> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end) - |> Enum.map(fn {_, group_name, "consumer"} -> group_name end) + |> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end) + |> Enum.map(fn {_, group_name, _} -> group_name end) end @impl true - def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do - KafkaWrapper.describe_groups(endpoint, list, consumer_group_names) - |> get_topic_names_for_consumer_groups - end + def get_consumer_group_info(endpoint, list \\ [], consumer_group_names) do + {:ok, group_descriptions} = KafkaWrapper.describe_groups(endpoint, list, consumer_group_names) - defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do group_descriptions - |> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end) - |> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end) + |> Enum.flat_map(fn %{group_id: consumer_group, members: members} -> + get_member_info(members) + |> Enum.map(fn {topic_names, consumer_id, member_host} -> + {consumer_group, topic_names, consumer_id, member_host} + end) + end) end - defp get_topic_names(members) do - Enum.flat_map(members, fn member -> - KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment) + @spec get_member_info( + list(%{client_host: binary, member_assignment: binary, member_id: binary}) + ) :: + list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary}) + defp get_member_info(members) do + Enum.map(members, fn %{ + client_host: member_host, + member_assignment: member_assignment, + member_id: consumer_id + } -> + topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment) + {topic_names, consumer_id, member_host} end) end end