diff --git a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex index 38abeb8..1435d20 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex @@ -4,10 +4,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do alias KafkaexLagExporter.ConsumerOffset alias KafkaexLagExporter.KafkaUtils - # TODO fix type @spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{ - lags: list(binary), - sum: list(binary) + lags: list(ConsumerOffset.t()), + sum: list(ConsumerOffset.t()) } def get(endpoint) do consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint) diff --git a/lib/kafkaex_lag_exporter/consumer_offset_runner.ex b/lib/kafkaex_lag_exporter/consumer_offset_runner.ex index 591c1d4..41e2ea1 100644 --- a/lib/kafkaex_lag_exporter/consumer_offset_runner.ex +++ b/lib/kafkaex_lag_exporter/consumer_offset_runner.ex @@ -29,7 +29,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do def handle_info(:tick, state) do [endpoint | _] = state.endpoints - %{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint) + %{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint) KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags) KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum) diff --git a/lib/kafkaex_lag_exporter/metrics.ex b/lib/kafkaex_lag_exporter/metrics.ex index 3e2129f..6118410 100644 --- a/lib/kafkaex_lag_exporter/metrics.ex +++ b/lib/kafkaex_lag_exporter/metrics.ex @@ -36,9 +36,13 @@ defmodule KafkaexLagExporter.Metrics do ) end + @spec group_sum_lag( + KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(), + list(ConsumerOffset.t()) + ) :: :ok @doc false - def group_sum_lag({host, _port}, cunsumer_offsets) do - Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset -> + def group_sum_lag({host, _port}, consumer_offsets) do + Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset -> lag = elem(consumer_offset.lag, 1) :telemetry.execute( @@ -55,6 +59,10 @@ defmodule KafkaexLagExporter.Metrics do end) end + @spec group_lag_per_partition( + KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(), + list(ConsumerOffset.t()) + ) :: :ok @doc false def group_lag_per_partition({host, _port}, consumer_offsets) do Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->