diff --git a/lib/kafkaex_lag_exporter/application.ex b/lib/kafkaex_lag_exporter/application.ex index 313ff6b..d6daccc 100644 --- a/lib/kafkaex_lag_exporter/application.ex +++ b/lib/kafkaex_lag_exporter/application.ex @@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.Application do {Phoenix.PubSub, name: KafkaexLagExporter.PubSub}, # Start the Endpoint (http/https) KafkaexLagExporterWeb.Endpoint, - KafkaexLagExporter.ConsumerOffset + KafkaexLagExporter.ConsumerOffsetRunner ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/kafkaex_lag_exporter/consumer_offset.ex b/lib/kafkaex_lag_exporter/consumer_offset.ex index 5b3dd52..e7b171b 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset.ex @@ -1,41 +1,17 @@ defmodule KafkaexLagExporter.ConsumerOffset do - @moduledoc "Genserver implementation to set offset metrics for consumer groups" + @moduledoc "Struct holding all relevant telemetry information of consumers" - use GenServer + @type t :: %__MODULE__{ + consumer_group: binary, + topic: binary, + lag: list({partition :: non_neg_integer, lag :: non_neg_integer}), + consumer_id: binary, + member_host: binary + } - require Logger - - @interval 5_000 - - def start_link(default) when is_list(default) do - GenServer.start_link(__MODULE__, default, name: __MODULE__) - end - - @impl true - def init(_) do - Logger.info("Starting #{__MODULE__}") - - clients = Application.get_env(:brod, :clients) - endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}] - - Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}") - - Process.send_after(self(), :tick, @interval) - - {:ok, %{endpoints: endpoints}} - end - - @impl true - def handle_info(:tick, state) do - [endpoint | _] = state.endpoints - - %{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint) - - KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags) - KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum) - - Process.send_after(self(), :tick, @interval) - - {:noreply, state} - end + defstruct consumer_group: "", + topic: "", + lag: [], + consumer_id: "", + member_host: "" end diff --git a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex index c44c4e9..38abeb8 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex @@ -1,6 +1,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do @moduledoc "Calculate summarized lag for each consumer group" + alias KafkaexLagExporter.ConsumerOffset alias KafkaexLagExporter.KafkaUtils # TODO fix type @@ -12,7 +13,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint) consumer_lags = - KafkaUtils.topic_names_for_consumer_groups(endpoint, [], consumer_group_names) + KafkaUtils.get_consumer_group_info(endpoint, [], consumer_group_names) |> Enum.flat_map(&get_lag_per_topic(&1)) consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags) @@ -20,16 +21,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do %{lags: consumer_lags, sum: consumer_lag_sum} end - defp get_lag_per_topic({consumer_group, topics}) do + @spec get_lag_per_topic( + {consumer_group :: binary, topics :: list(binary), consumer_id :: binary, + member_host :: binary} + ) :: list(ConsumerOffset.t()) + defp get_lag_per_topic({consumer_group, topics, consumer_id, member_host}) do Enum.map(topics, fn topic -> lag = KafkaUtils.lag(topic, consumer_group, :client1) - {consumer_group, topic, lag} + + %ConsumerOffset{ + consumer_group: consumer_group, + topic: topic, + lag: lag, + consumer_id: consumer_id, + member_host: member_host + } end) end + @spec get_lag_per_topic(list(ConsumerOffset.t())) :: list(ConsumerOffset.t()) defp get_lag_for_consumer_sum(lags_per_consumer_group) do - Enum.map(lags_per_consumer_group, fn {consumer_group, topic, lag_per_partition} -> - {consumer_group, topic, sum_topic_lag(lag_per_partition, 0)} + Enum.map(lags_per_consumer_group, fn consumer_offset -> + lag_sum = sum_topic_lag(consumer_offset.lag, 0) + %ConsumerOffset{consumer_offset | lag: {0, lag_sum}} end) end diff --git a/lib/kafkaex_lag_exporter/consumer_offset_runner.ex b/lib/kafkaex_lag_exporter/consumer_offset_runner.ex new file mode 100644 index 0000000..591c1d4 --- /dev/null +++ b/lib/kafkaex_lag_exporter/consumer_offset_runner.ex @@ -0,0 +1,41 @@ +defmodule KafkaexLagExporter.ConsumerOffsetRunner do + @moduledoc "Genserver implementation to set offset metrics for consumer groups" + + use GenServer + + require Logger + + @interval 5_000 + + def start_link(default) when is_list(default) do + GenServer.start_link(__MODULE__, default, name: __MODULE__) + end + + @impl true + def init(_) do + Logger.info("Starting #{__MODULE__}") + + clients = Application.get_env(:brod, :clients) + endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}] + + Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}") + + Process.send_after(self(), :tick, @interval) + + {:ok, %{endpoints: endpoints}} + end + + @impl true + def handle_info(:tick, state) do + [endpoint | _] = state.endpoints + + %{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint) + + KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags) + KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum) + + Process.send_after(self(), :tick, @interval) + + {:noreply, state} + end +end diff --git a/lib/kafkaex_lag_exporter/metrics.ex b/lib/kafkaex_lag_exporter/metrics.ex index b3e1882..3e2129f 100644 --- a/lib/kafkaex_lag_exporter/metrics.ex +++ b/lib/kafkaex_lag_exporter/metrics.ex @@ -3,6 +3,8 @@ defmodule KafkaexLagExporter.Metrics do use PromEx.Plugin + alias KafkaexLagExporter.ConsumerOffset + require Logger @kafka_event :kafka @@ -21,52 +23,52 @@ defmodule KafkaexLagExporter.Metrics do event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag], description: "Sum of group offset lag across topic partitions", measurement: :lag, - # TODO: add more tags like member_host, consumer_id, client_id, ... - tags: [:cluster_name, :group, :topic] + tags: [:cluster_name, :group, :topic, :consumer_id, :member_host] ), last_value( [@kafka_event, :consumergroup, :group, :lag], event_name: [@kafka_event, :consumergroup, :group, :lag], description: "Group offset lag of a partition", measurement: :lag, - # TODO: add more tags like member_host, consumer_id, client_id, ... - tags: [:cluster_name, :group, :partition, :topic] + tags: [:cluster_name, :group, :partition, :topic, :consumer_id, :member_host] ) ] ) end @doc false - def group_sum_lag({host, _port}, consumer_lags) do - Enum.each(consumer_lags, fn {group_name, lag} -> + def group_sum_lag({host, _port}, cunsumer_offsets) do + Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset -> + lag = elem(consumer_offset.lag, 1) + :telemetry.execute( [@kafka_event, :consumergroup, :group, :topic, :sum, :lag], - %{ - lag: lag - }, + %{lag: lag}, %{ cluster_name: host, - group: group_name, - topic: [] + group: consumer_offset.consumer_group, + topic: consumer_offset.topic, + consumer_id: consumer_offset.consumer_id, + member_host: consumer_offset.member_host } ) end) end @doc false - def group_lag_per_partition({host, _port}, consumer_lags) do - Enum.each(consumer_lags, fn {group_name, lags} -> - Enum.each(lags, fn {partition, lag} -> + def group_lag_per_partition({host, _port}, consumer_offsets) do + Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset -> + Enum.each(consumer_offset.lag, fn {partition, lag} -> :telemetry.execute( [@kafka_event, :consumergroup, :group, :lag], - %{ - lag: lag - }, + %{lag: lag}, %{ cluster_name: host, - group: group_name, + group: consumer_offset.consumer_group, partition: partition, - topic: [] + topic: consumer_offset.topic, + consumer_id: consumer_offset.consumer_id, + member_host: consumer_offset.member_host } ) end) diff --git a/test/consumer_offset_fetcher_test.exs b/test/consumer_offset_fetcher_test.exs index e16838e..d19717d 100644 --- a/test/consumer_offset_fetcher_test.exs +++ b/test/consumer_offset_fetcher_test.exs @@ -2,6 +2,8 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do use ExUnit.Case use Patch + alias KafkaexLagExporter.ConsumerOffset + @test_consumer_group_name1 "test_consumer_1" @test_consumer_group_name2 "test_consumer_2" @test_lags1 [{0, 23}, {1, 42}, {2, 666}] @@ -9,6 +11,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do @test_topic1 "test_topic_1" @test_topic2 "test_topic_2" @test_topic3 "test_topic_3" + @test_consumer_id1 "test_consumer_id1" + @test_consumer_id2 "test_consumer_id2" + @test_member_host "127.0.0.1" setup do patch( @@ -21,11 +26,12 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do patch( KafkaexLagExporter.KafkaUtils, - :topic_names_for_consumer_groups, + :get_consumer_group_info, fn _, _, _ -> [ - {@test_consumer_group_name1, [@test_topic1, @test_topic2]}, - {@test_consumer_group_name2, [@test_topic3]} + {@test_consumer_group_name1, [@test_topic1, @test_topic2], @test_consumer_id1, + @test_member_host}, + {@test_consumer_group_name2, [@test_topic3], @test_consumer_id2, @test_member_host} ] end ) @@ -39,15 +45,51 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do %{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint) assert sum == [ - {@test_consumer_group_name1, @test_topic1, 731}, - {@test_consumer_group_name1, @test_topic2, 6}, - {@test_consumer_group_name2, @test_topic3, 6} + %ConsumerOffset{ + consumer_group: @test_consumer_group_name1, + topic: @test_topic1, + lag: {0, 731}, + consumer_id: @test_consumer_id1, + member_host: @test_member_host + }, + %ConsumerOffset{ + consumer_group: @test_consumer_group_name1, + topic: @test_topic2, + lag: {0, 6}, + consumer_id: @test_consumer_id1, + member_host: @test_member_host + }, + %ConsumerOffset{ + consumer_group: @test_consumer_group_name2, + topic: @test_topic3, + lag: {0, 6}, + consumer_id: @test_consumer_id2, + member_host: @test_member_host + } ] assert lags == [ - {@test_consumer_group_name1, @test_topic1, @test_lags1}, - {@test_consumer_group_name1, @test_topic2, @test_lags2}, - {@test_consumer_group_name2, @test_topic3, @test_lags2} + %ConsumerOffset{ + consumer_group: @test_consumer_group_name1, + topic: @test_topic1, + lag: @test_lags1, + consumer_id: @test_consumer_id1, + member_host: @test_member_host + }, + %ConsumerOffset{ + consumer_group: @test_consumer_group_name1, + topic: @test_topic2, + lag: @test_lags2, + consumer_id: @test_consumer_id1, + member_host: @test_member_host + }, + %ConsumerOffset{ + consumer_group: @test_consumer_group_name2, + topic: @test_topic3, + lag: @test_lags2, + consumer_id: @test_consumer_id2, + member_host: @test_member_host + } ] end