From d3c3372ee003ab9a3eebda8afe2fb79724e68f09 Mon Sep 17 00:00:00 2001 From: Pascal Schmid Date: Wed, 26 Jul 2023 23:09:25 +0200 Subject: [PATCH] Fetch lag per consumer group --- config/dev.exs | 18 +++++ config/prod.exs | 18 +++++ .../consumer_offset_fetcher.ex | 51 +++++++++++--- lib/kafkaex_lag_exporter/utils.ex | 70 +++++++++++++++++++ 4 files changed, 146 insertions(+), 11 deletions(-) create mode 100644 lib/kafkaex_lag_exporter/utils.ex diff --git a/config/dev.exs b/config/dev.exs index f9b637b..b57549b 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -49,3 +49,21 @@ config :phoenix, :stacktrace_depth, 20 # Initialize plugs at runtime for faster development compilation config :phoenix, :plug_init_mode, :runtime + +config :brod, + clients: [ + client1: [ + # non ssl + endpoints: [redpanda: 29092], + # endpoints: [localhost: 9093], # ssl + # for safety, default true + allow_topic_auto_creation: false, + # get_metadata_timeout_seconds: 5, # default 5 + # max_metadata_sock_retry: 2, # seems obsolete + max_metadata_sock_retry: 5, + # query_api_versions: false, # default true, set false for Kafka < 0.10 + # reconnect_cool_down_seconds: 1, # default 1 + # default 5 + restart_delay_seconds: 10 + ] + ] diff --git a/config/prod.exs b/config/prod.exs index bc14ad6..6cddf1b 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -48,3 +48,21 @@ config :logger, level: :info # force_ssl: [hsts: true] # # Check `Plug.SSL` for all available options in `force_ssl`. + +config :brod, + clients: [ + client1: [ + # non ssl + endpoints: [redpanda: 29092], + # endpoints: [localhost: 9093], # ssl + # for safety, default true + allow_topic_auto_creation: false, + # get_metadata_timeout_seconds: 5, # default 5 + # max_metadata_sock_retry: 2, # seems obsolete + max_metadata_sock_retry: 5, + # query_api_versions: false, # default true, set false for Kafka < 0.10 + # reconnect_cool_down_seconds: 1, # default 1 + # default 5 + restart_delay_seconds: 10 + ] + ] diff --git a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex index bd7f524..bb6ffc3 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex @@ -1,7 +1,5 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do - @moduledoc """ - Genserver implementation to consume new messages on topic '__consumer_offsets' - """ + @moduledoc "Genserver implementation to calculate summarized lag for each consumer group" use GenServer @@ -28,21 +26,52 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do @impl true def handle_info(:tick, state) do - consumer_groups = :brod.list_all_groups(state.endpoints, []) + [endpoint | _] = state.endpoints || [{"redpanda", 29_092}] - Enum.each(consumer_groups, fn {broker_info, group_name} -> - describe_group(broker_info, group_name) - end) + consumer_group_names = get_consumer_group_names(endpoint) + + consumer_lags = + :brod.describe_groups(endpoint, [], consumer_group_names) + |> get_consumer_lags + + Logger.info("Consumer lags: #{inspect(consumer_lags)}") Process.send_after(self(), :tick, @interval) {:noreply, state} end - defp describe_group(_, []), do: nil + defp get_consumer_group_names({host, port}) do + [{_, groups} | _] = :brod.list_all_groups([{host, port}], []) - defp describe_group(broker_info, group_name) do - Logger.info("Getting info for group name: #{group_name}") - :brod.describe_groups(broker_info, [], group_name) + groups + |> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end) + |> Enum.map(fn {_, group_name, "consumer"} -> group_name end) + end + + defp get_consumer_lags({:ok, group_descriptions}) do + group_descriptions + |> Enum.map(fn %{group_id: consumer_group, members: members} -> + [consumer_group, members] + end) + |> Enum.map(fn [consumer_group, members] -> [consumer_group, get_topic_names(members)] end) + |> Enum.map(fn [consumer_group, topics] -> + [consumer_group, get_lag_for_consumer(consumer_group, topics)] + end) + end + + defp get_consumer_lags({_, _}), do: [] + + defp get_topic_names(members) do + Enum.flat_map(members, fn member -> + KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment) + end) + end + + defp get_lag_for_consumer(consumer_group, topics) do + topics + |> Enum.reduce(0, fn topic, acc -> + acc + KafkaexLagExporter.KafkaUtils.lag_total(topic, consumer_group, :client1) + end) end end diff --git a/lib/kafkaex_lag_exporter/utils.ex b/lib/kafkaex_lag_exporter/utils.ex new file mode 100644 index 0000000..f6447c6 --- /dev/null +++ b/lib/kafkaex_lag_exporter/utils.ex @@ -0,0 +1,70 @@ +# source code taken from https://github.com/reachfh/brod_group_subscriber_example + +defmodule KafkaexLagExporter.KafkaUtils do + @moduledoc "Utility functions for dealing with Kafka" + + @default_client :client1 + + @type endpoint() :: {host :: atom(), port :: non_neg_integer()} + + def connection, do: connection(@default_client) + @spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()} + def connection(client) do + clients = Application.get_env(:brod, :clients) + config = clients[client] + + endpoints = config[:endpoints] || [{~c"localhost", 9092}] + + sock_opts = + case Keyword.fetch(config, :ssl) do + {:ok, ssl_opts} -> + [ssl: ssl_opts] + + :error -> + [] + end + + {endpoints, sock_opts} + end + + @spec resolve_offsets(binary(), :earliest | :latest, atom()) :: + list({non_neg_integer(), integer()}) + def resolve_offsets(topic, type, client) do + {endpoints, sock_opts} = connection(client) + + {:ok, partitions_count} = :brod.get_partitions_count(client, topic) + + for i <- Range.new(0, partitions_count - 1), + {:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do + {i, offset} + end + end + + @spec fetch_committed_offsets(binary(), binary(), atom()) :: + {non_neg_integer(), non_neg_integer()} + def fetch_committed_offsets(_topic, consumer_group, client) do + {endpoints, sock_opts} = connection(client) + {:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group) + + for r <- response, + pr <- r[:partitions], + do: {pr[:partition_index], pr[:committed_offset]} + end + + @spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()}) + def lag(topic, consumer_group, client) do + offsets = resolve_offsets(topic, :latest, client) + committed_offsets = fetch_committed_offsets(topic, consumer_group, client) + + for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do + {part, current - committed} + end + end + + @spec lag_total(binary(), binary(), atom()) :: non_neg_integer() + def lag_total(topic, consumer_group, client) do + for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do + acc -> acc + recs + end + end +end