diff --git a/lib/kafkaex_lag_exporter/consumer_offset_runner.ex b/lib/kafkaex_lag_exporter/consumer_offset_runner.ex index 41e2ea1..a67689c 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset_runner.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset_runner.ex @@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do require Logger - @interval 5_000 - def start_link(default) when is_list(default) do GenServer.start_link(__MODULE__, default, name: __MODULE__) end @@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do clients = Application.get_env(:brod, :clients) endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}] + interval = + System.get_env("KAFKA_EX_INTERVAL", "5000") + |> String.to_integer() + Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}") + Logger.info("Updating lag information every #{interval} milliseconds") - Process.send_after(self(), :tick, @interval) + Process.send_after(self(), :tick, interval) - {:ok, %{endpoints: endpoints}} + {:ok, %{endpoints: endpoints, interval: interval}} end @impl true def handle_info(:tick, state) do [endpoint | _] = state.endpoints + interval = state.interval %{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint) KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags) KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum) - Process.send_after(self(), :tick, @interval) + Process.send_after(self(), :tick, interval) {:noreply, state} end