Set correct type
This commit is contained in:
@@ -4,10 +4,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
|||||||
alias KafkaexLagExporter.ConsumerOffset
|
alias KafkaexLagExporter.ConsumerOffset
|
||||||
alias KafkaexLagExporter.KafkaUtils
|
alias KafkaexLagExporter.KafkaUtils
|
||||||
|
|
||||||
# TODO fix type
|
|
||||||
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
|
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
|
||||||
lags: list(binary),
|
lags: list(ConsumerOffset.t()),
|
||||||
sum: list(binary)
|
sum: list(ConsumerOffset.t())
|
||||||
}
|
}
|
||||||
def get(endpoint) do
|
def get(endpoint) do
|
||||||
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
|
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
[endpoint | _] = state.endpoints
|
[endpoint | _] = state.endpoints
|
||||||
|
|
||||||
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||||
|
|
||||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||||
|
|||||||
@@ -36,9 +36,13 @@ defmodule KafkaexLagExporter.Metrics do
|
|||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@spec group_sum_lag(
|
||||||
|
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
|
||||||
|
list(ConsumerOffset.t())
|
||||||
|
) :: :ok
|
||||||
@doc false
|
@doc false
|
||||||
def group_sum_lag({host, _port}, cunsumer_offsets) do
|
def group_sum_lag({host, _port}, consumer_offsets) do
|
||||||
Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||||
lag = elem(consumer_offset.lag, 1)
|
lag = elem(consumer_offset.lag, 1)
|
||||||
|
|
||||||
:telemetry.execute(
|
:telemetry.execute(
|
||||||
@@ -55,6 +59,10 @@ defmodule KafkaexLagExporter.Metrics do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@spec group_lag_per_partition(
|
||||||
|
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
|
||||||
|
list(ConsumerOffset.t())
|
||||||
|
) :: :ok
|
||||||
@doc false
|
@doc false
|
||||||
def group_lag_per_partition({host, _port}, consumer_offsets) do
|
def group_lag_per_partition({host, _port}, consumer_offsets) do
|
||||||
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||||
|
|||||||
Reference in New Issue
Block a user