Add metrics stub
This commit is contained in:
@@ -8,7 +8,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
|||||||
@interval 5_000
|
@interval 5_000
|
||||||
|
|
||||||
def start_link(default) when is_list(default) do
|
def start_link(default) when is_list(default) do
|
||||||
GenServer.start_link(__MODULE__, default)
|
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
@@ -34,7 +34,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
|||||||
:brod.describe_groups(endpoint, [], consumer_group_names)
|
:brod.describe_groups(endpoint, [], consumer_group_names)
|
||||||
|> get_consumer_lags
|
|> get_consumer_lags
|
||||||
|
|
||||||
Logger.info("Consumer lags: #{inspect(consumer_lags)}")
|
KafkaexLagExporter.Metrics.kafka_metrics(endpoint, consumer_lags)
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, @interval)
|
||||||
|
|
||||||
@@ -57,6 +57,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
|||||||
|> Enum.map(fn [consumer_group, members] -> [consumer_group, get_topic_names(members)] end)
|
|> Enum.map(fn [consumer_group, members] -> [consumer_group, get_topic_names(members)] end)
|
||||||
|> Enum.map(fn [consumer_group, topics] ->
|
|> Enum.map(fn [consumer_group, topics] ->
|
||||||
[consumer_group, get_lag_for_consumer(consumer_group, topics)]
|
[consumer_group, get_lag_for_consumer(consumer_group, topics)]
|
||||||
|
|
||||||
|
# credo:disable-for-next-line
|
||||||
|
# TODO: [consumer_group, topic, get_lag_for_consumer(consumer_group, topic)]
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
46
lib/kafkaex_lag_exporter/metrics.ex
Normal file
46
lib/kafkaex_lag_exporter/metrics.ex
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
defmodule KafkaexLagExporter.Metrics do
|
||||||
|
@moduledoc "Metrics module is responsible for building and collecting kafka metrics"
|
||||||
|
|
||||||
|
use PromEx.Plugin
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@kafka_event :kafka
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def manual_metrics(_opts) do
|
||||||
|
clients = Application.get_env(:brod, :clients)
|
||||||
|
[endpoint | _] = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||||
|
|
||||||
|
Manual.build(
|
||||||
|
:application_versions_manual_metrics,
|
||||||
|
{__MODULE__, :kafka_metrics, [endpoint, []]},
|
||||||
|
[
|
||||||
|
last_value(
|
||||||
|
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
|
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
|
description: "Sum of group offset lag across topic partitions",
|
||||||
|
measurement: :lag,
|
||||||
|
tags: [:cluster_name, :group, :topic]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
def kafka_metrics({host, _port}, consumer_lags) do
|
||||||
|
Enum.each(consumer_lags, fn [group_name, lag] ->
|
||||||
|
:telemetry.execute(
|
||||||
|
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
|
%{
|
||||||
|
lag: lag
|
||||||
|
},
|
||||||
|
%{
|
||||||
|
cluster_name: host,
|
||||||
|
group: group_name,
|
||||||
|
topic: []
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -61,7 +61,7 @@ defmodule KafkaexLagExporter.PromEx do
|
|||||||
[
|
[
|
||||||
# PromEx built in plugins
|
# PromEx built in plugins
|
||||||
Plugins.Application,
|
Plugins.Application,
|
||||||
Plugins.Beam
|
Plugins.Beam,
|
||||||
# {Plugins.Phoenix, router: KafkaexLagExporterWeb.Router, endpoint: KafkaexLagExporterWeb.Endpoint},
|
# {Plugins.Phoenix, router: KafkaexLagExporterWeb.Router, endpoint: KafkaexLagExporterWeb.Endpoint},
|
||||||
# Plugins.Ecto,
|
# Plugins.Ecto,
|
||||||
# Plugins.Oban,
|
# Plugins.Oban,
|
||||||
@@ -70,7 +70,7 @@ defmodule KafkaexLagExporter.PromEx do
|
|||||||
# Plugins.Broadway,
|
# Plugins.Broadway,
|
||||||
|
|
||||||
# Add your own PromEx metrics plugins
|
# Add your own PromEx metrics plugins
|
||||||
# KafkaexLagExporter.Users.PromExPlugin
|
KafkaexLagExporter.Metrics
|
||||||
]
|
]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user