Export lag per parition
This commit is contained in:
@@ -14,21 +14,30 @@ defmodule KafkaexLagExporter.Metrics do
|
|||||||
|
|
||||||
Manual.build(
|
Manual.build(
|
||||||
:application_versions_manual_metrics,
|
:application_versions_manual_metrics,
|
||||||
{__MODULE__, :kafka_metrics, [endpoint, []]},
|
{__MODULE__, :group_sum_lag, [endpoint, []]},
|
||||||
[
|
[
|
||||||
last_value(
|
last_value(
|
||||||
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
description: "Sum of group offset lag across topic partitions",
|
description: "Sum of group offset lag across topic partitions",
|
||||||
measurement: :lag,
|
measurement: :lag,
|
||||||
|
# TODO: add more tags like member_host, consumer_id, client_id, ...
|
||||||
tags: [:cluster_name, :group, :topic]
|
tags: [:cluster_name, :group, :topic]
|
||||||
|
),
|
||||||
|
last_value(
|
||||||
|
[@kafka_event, :consumergroup, :group, :lag],
|
||||||
|
event_name: [@kafka_event, :consumergroup, :group, :lag],
|
||||||
|
description: "Group offset lag of a partition",
|
||||||
|
measurement: :lag,
|
||||||
|
# TODO: add more tags like member_host, consumer_id, client_id, ...
|
||||||
|
tags: [:cluster_name, :group, :partition, :topic]
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc false
|
@doc false
|
||||||
def kafka_metrics({host, _port}, consumer_lags) do
|
def group_sum_lag({host, _port}, consumer_lags) do
|
||||||
Enum.each(consumer_lags, fn [group_name, lag] ->
|
Enum.each(consumer_lags, fn [group_name, lag] ->
|
||||||
:telemetry.execute(
|
:telemetry.execute(
|
||||||
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
@@ -43,4 +52,24 @@ defmodule KafkaexLagExporter.Metrics do
|
|||||||
)
|
)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
def group_lag_per_partition({host, _port}, consumer_lags) do
|
||||||
|
Enum.each(consumer_lags, fn [group_name, lags] ->
|
||||||
|
Enum.each(lags, fn {partition, lag} ->
|
||||||
|
:telemetry.execute(
|
||||||
|
[@kafka_event, :consumergroup, :group, :lag],
|
||||||
|
%{
|
||||||
|
lag: lag
|
||||||
|
},
|
||||||
|
%{
|
||||||
|
cluster_name: host,
|
||||||
|
group: group_name,
|
||||||
|
partition: partition,
|
||||||
|
topic: []
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user