Make update interval configurable
This commit is contained in:
@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
@interval 5_000
|
|
||||||
|
|
||||||
def start_link(default) when is_list(default) do
|
def start_link(default) when is_list(default) do
|
||||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||||
end
|
end
|
||||||
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
clients = Application.get_env(:brod, :clients)
|
clients = Application.get_env(:brod, :clients)
|
||||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||||
|
|
||||||
|
interval =
|
||||||
|
System.get_env("KAFKA_EX_INTERVAL", "5000")
|
||||||
|
|> String.to_integer()
|
||||||
|
|
||||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||||
|
Logger.info("Updating lag information every #{interval} milliseconds")
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, interval)
|
||||||
|
|
||||||
{:ok, %{endpoints: endpoints}}
|
{:ok, %{endpoints: endpoints, interval: interval}}
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
[endpoint | _] = state.endpoints
|
[endpoint | _] = state.endpoints
|
||||||
|
interval = state.interval
|
||||||
|
|
||||||
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||||
|
|
||||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, interval)
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user