diff --git a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex index bb6ffc3..ee929c1 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex @@ -8,7 +8,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do @interval 5_000 def start_link(default) when is_list(default) do - GenServer.start_link(__MODULE__, default) + GenServer.start_link(__MODULE__, default, name: __MODULE__) end @impl true @@ -34,7 +34,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do :brod.describe_groups(endpoint, [], consumer_group_names) |> get_consumer_lags - Logger.info("Consumer lags: #{inspect(consumer_lags)}") + KafkaexLagExporter.Metrics.kafka_metrics(endpoint, consumer_lags) Process.send_after(self(), :tick, @interval) @@ -57,6 +57,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do |> Enum.map(fn [consumer_group, members] -> [consumer_group, get_topic_names(members)] end) |> Enum.map(fn [consumer_group, topics] -> [consumer_group, get_lag_for_consumer(consumer_group, topics)] + + # credo:disable-for-next-line + # TODO: [consumer_group, topic, get_lag_for_consumer(consumer_group, topic)] end) end diff --git a/lib/kafkaex_lag_exporter/metrics.ex b/lib/kafkaex_lag_exporter/metrics.ex new file mode 100644 index 0000000..c07e844 --- /dev/null +++ b/lib/kafkaex_lag_exporter/metrics.ex @@ -0,0 +1,46 @@ +defmodule KafkaexLagExporter.Metrics do + @moduledoc "Metrics module is responsible for building and collecting kafka metrics" + + use PromEx.Plugin + + require Logger + + @kafka_event :kafka + + @impl true + def manual_metrics(_opts) do + clients = Application.get_env(:brod, :clients) + [endpoint | _] = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}] + + Manual.build( + :application_versions_manual_metrics, + {__MODULE__, :kafka_metrics, [endpoint, []]}, + [ + last_value( + [@kafka_event, :consumergroup, :group, :topic, :sum, :lag], + event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag], + description: "Sum of group offset lag across topic partitions", + measurement: :lag, + tags: [:cluster_name, :group, :topic] + ) + ] + ) + end + + @doc false + def kafka_metrics({host, _port}, consumer_lags) do + Enum.each(consumer_lags, fn [group_name, lag] -> + :telemetry.execute( + [@kafka_event, :consumergroup, :group, :topic, :sum, :lag], + %{ + lag: lag + }, + %{ + cluster_name: host, + group: group_name, + topic: [] + } + ) + end) + end +end diff --git a/lib/kafkaex_lag_exporter/prom_ex.ex b/lib/kafkaex_lag_exporter/prom_ex.ex index 5552e7f..b8cacea 100644 --- a/lib/kafkaex_lag_exporter/prom_ex.ex +++ b/lib/kafkaex_lag_exporter/prom_ex.ex @@ -61,7 +61,7 @@ defmodule KafkaexLagExporter.PromEx do [ # PromEx built in plugins Plugins.Application, - Plugins.Beam + Plugins.Beam, # {Plugins.Phoenix, router: KafkaexLagExporterWeb.Router, endpoint: KafkaexLagExporterWeb.Endpoint}, # Plugins.Ecto, # Plugins.Oban, @@ -70,7 +70,7 @@ defmodule KafkaexLagExporter.PromEx do # Plugins.Broadway, # Add your own PromEx metrics plugins - # KafkaexLagExporter.Users.PromExPlugin + KafkaexLagExporter.Metrics ] end end