Get lags per partition

This commit is contained in:
2024-03-02 21:23:29 +01:00
parent da7c801c02
commit e63e550a6b
2 changed files with 34 additions and 19 deletions

View File

@@ -53,8 +53,13 @@ defmodule KafkaexLagExporter.KafkaUtils do
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()})
def lag(topic, consumer_group, client) do
offsets = resolve_offsets(topic, :latest, client)
committed_offsets = fetch_committed_offsets(topic, consumer_group, client)
offsets =
resolve_offsets(topic, :latest, client)
|> Enum.sort_by(fn {key, _value} -> key end)
committed_offsets =
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}