defmodule KafkaexLagExporter.ConsumerOffsetFetcher do @moduledoc "Calculate summarized lag for each consumer group" require Logger # TODO: change return type @spec get(KafkaexLagExporter.KafkaWrapper.endpoint()) :: {any(), any()} def get(endpoint) do consumer_group_names = KafkaexLagExporter.KafkaUtils.get_consumer_group_names(endpoint) consumer_lags = KafkaexLagExporter.KafkaUtils.topic_names_for_consumer_groups( endpoint, [], consumer_group_names ) |> Enum.map(fn [consumer_group, topics] -> [consumer_group, get_lag_for_consumer(consumer_group, topics)] end) consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags) %{lags: consumer_lags, sum: consumer_lag_sum} end defp get_lag_for_consumer(consumer_group, topics) do topics |> Enum.flat_map(fn topic -> KafkaexLagExporter.KafkaUtils.lag(topic, consumer_group, :client1) end) end defp get_lag_for_consumer_sum(lags_per_consumer_group) do lags_per_consumer_group |> Enum.map(fn [topic, lag_per_partition] -> [topic, sum_topic_lag(lag_per_partition, 0)] end) end defp sum_topic_lag([], acc), do: acc defp sum_topic_lag([h | t], acc), do: sum_topic_lag(t, acc + elem(h, 1)) end