Compare commits

..

8 Commits

Author SHA1 Message Date
baed79e1ba Add more tags to metrics 2024-03-28 23:49:44 +01:00
a68a0126c8 Update test for KafkaUtils 2024-03-28 23:46:26 +01:00
1630d1dcda Remove empty module 2024-03-28 23:45:56 +01:00
4922da165c Return more data for members 2024-03-28 23:45:40 +01:00
9fbf7a98b8 Fix init cluster script 2024-03-28 23:44:25 +01:00
08b5923d52 Simplify code 2024-03-28 21:23:51 +01:00
6bf68b74ec Install test watcher 2024-03-28 19:05:59 +01:00
1cc9afbcf4 Adjust async level of tests 2024-03-28 18:41:35 +01:00
15 changed files with 320 additions and 117 deletions

View File

@@ -2,7 +2,7 @@
kind delete cluster kind delete cluster
kind create cluster --config deployment/kind.yaml kind create cluster --config kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m

24
kind.yaml Normal file
View File

@@ -0,0 +1,24 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f

View File

@@ -1,13 +0,0 @@
defmodule KafkaexLagExporter do
@moduledoc """
KafkaexLagExporter keeps the contexts that define your domain
and business logic.
Contexts are also responsible for managing your data, regardless
if it comes from the database, an external API or others.
"""
def hello() do
:world
end
end

View File

@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.Application do
{Phoenix.PubSub, name: KafkaexLagExporter.PubSub}, {Phoenix.PubSub, name: KafkaexLagExporter.PubSub},
# Start the Endpoint (http/https) # Start the Endpoint (http/https)
KafkaexLagExporterWeb.Endpoint, KafkaexLagExporterWeb.Endpoint,
KafkaexLagExporter.ConsumerOffset KafkaexLagExporter.ConsumerOffsetRunner
] ]
# See https://hexdocs.pm/elixir/Supervisor.html # See https://hexdocs.pm/elixir/Supervisor.html

View File

@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary) @callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
@callback topic_names_for_consumer_groups( @callback get_consumer_group_info(
{host :: atom, port :: non_neg_integer}, {host :: atom, port :: non_neg_integer},
list(binary), list(binary),
list(binary) list(binary)
@@ -35,8 +35,8 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port}) def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
def topic_names_for_consumer_groups(endpoint, list, consumer_group_names), def get_consumer_group_info(endpoint, list, consumer_group_names),
do: impl().topic_names_for_consumer_groups(endpoint, list, consumer_group_names) do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
defp impl, defp impl,
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils) do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)

View File

@@ -1,41 +1,17 @@
defmodule KafkaexLagExporter.ConsumerOffset do defmodule KafkaexLagExporter.ConsumerOffset do
@moduledoc "Genserver implementation to set offset metrics for consumer groups" @moduledoc "Struct holding all relevant telemetry information of consumers"
use GenServer @type t :: %__MODULE__{
consumer_group: binary,
topic: binary,
lag: list({partition :: non_neg_integer, lag :: non_neg_integer}),
consumer_id: binary,
member_host: binary
}
require Logger defstruct consumer_group: "",
topic: "",
@interval 5_000 lag: [],
consumer_id: "",
def start_link(default) when is_list(default) do member_host: ""
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Starting #{__MODULE__}")
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Process.send_after(self(), :tick, @interval)
{:ok, %{endpoints: endpoints}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
{:noreply, state}
end
end end

View File

@@ -1,8 +1,7 @@
defmodule KafkaexLagExporter.ConsumerOffsetFetcher do defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
@moduledoc "Calculate summarized lag for each consumer group" @moduledoc "Calculate summarized lag for each consumer group"
require Logger alias KafkaexLagExporter.ConsumerOffset
alias KafkaexLagExporter.KafkaUtils alias KafkaexLagExporter.KafkaUtils
# TODO fix type # TODO fix type
@@ -14,30 +13,38 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint) consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
consumer_lags = consumer_lags =
KafkaUtils.topic_names_for_consumer_groups( KafkaUtils.get_consumer_group_info(endpoint, [], consumer_group_names)
endpoint, |> Enum.flat_map(&get_lag_per_topic(&1))
[],
consumer_group_names
)
|> Enum.map(fn {consumer_group, topics} ->
{consumer_group, get_lag_for_consumer(consumer_group, topics)}
end)
consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags) consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags)
%{lags: consumer_lags, sum: consumer_lag_sum} %{lags: consumer_lags, sum: consumer_lag_sum}
end end
defp get_lag_for_consumer(consumer_group, topics) do @spec get_lag_per_topic(
topics {consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|> Enum.flat_map(fn topic -> member_host :: binary}
KafkaUtils.lag(topic, consumer_group, :client1) ) :: list(ConsumerOffset.t())
defp get_lag_per_topic({consumer_group, topics, consumer_id, member_host}) do
Enum.map(topics, fn topic ->
lag = KafkaUtils.lag(topic, consumer_group, :client1)
%ConsumerOffset{
consumer_group: consumer_group,
topic: topic,
lag: lag,
consumer_id: consumer_id,
member_host: member_host
}
end) end)
end end
@spec get_lag_per_topic(list(ConsumerOffset.t())) :: list(ConsumerOffset.t())
defp get_lag_for_consumer_sum(lags_per_consumer_group) do defp get_lag_for_consumer_sum(lags_per_consumer_group) do
lags_per_consumer_group Enum.map(lags_per_consumer_group, fn consumer_offset ->
|> Enum.map(fn {topic, lag_per_partition} -> {topic, sum_topic_lag(lag_per_partition, 0)} end) lag_sum = sum_topic_lag(consumer_offset.lag, 0)
%ConsumerOffset{consumer_offset | lag: {0, lag_sum}}
end)
end end
defp sum_topic_lag([], acc), do: acc defp sum_topic_lag([], acc), do: acc

View File

@@ -0,0 +1,41 @@
defmodule KafkaexLagExporter.ConsumerOffsetRunner do
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
use GenServer
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Starting #{__MODULE__}")
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Process.send_after(self(), :tick, @interval)
{:ok, %{endpoints: endpoints}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
{:noreply, state}
end
end

View File

@@ -82,25 +82,35 @@ defmodule KafkaexLagExporter.KafkaUtils do
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], []) [{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
groups groups
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end) |> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end) |> Enum.map(fn {_, group_name, _} -> group_name end)
end end
@impl true @impl true
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do def get_consumer_group_info(endpoint, list \\ [], consumer_group_names) do
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names) {:ok, group_descriptions} = KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|> get_topic_names_for_consumer_groups
end
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
group_descriptions group_descriptions
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end) |> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end) get_member_info(members)
|> Enum.map(fn {topic_names, consumer_id, member_host} ->
{consumer_group, topic_names, consumer_id, member_host}
end)
end)
end end
defp get_topic_names(members) do @spec get_member_info(
Enum.flat_map(members, fn member -> list(%{client_host: binary, member_assignment: binary, member_id: binary})
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment) ) ::
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
defp get_member_info(members) do
Enum.map(members, fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topic_names, consumer_id, member_host}
end) end)
end end
end end

View File

@@ -3,6 +3,8 @@ defmodule KafkaexLagExporter.Metrics do
use PromEx.Plugin use PromEx.Plugin
alias KafkaexLagExporter.ConsumerOffset
require Logger require Logger
@kafka_event :kafka @kafka_event :kafka
@@ -21,52 +23,52 @@ defmodule KafkaexLagExporter.Metrics do
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag], event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
description: "Sum of group offset lag across topic partitions", description: "Sum of group offset lag across topic partitions",
measurement: :lag, measurement: :lag,
# TODO: add more tags like member_host, consumer_id, client_id, ... tags: [:cluster_name, :group, :topic, :consumer_id, :member_host]
tags: [:cluster_name, :group, :topic]
), ),
last_value( last_value(
[@kafka_event, :consumergroup, :group, :lag], [@kafka_event, :consumergroup, :group, :lag],
event_name: [@kafka_event, :consumergroup, :group, :lag], event_name: [@kafka_event, :consumergroup, :group, :lag],
description: "Group offset lag of a partition", description: "Group offset lag of a partition",
measurement: :lag, measurement: :lag,
# TODO: add more tags like member_host, consumer_id, client_id, ... tags: [:cluster_name, :group, :partition, :topic, :consumer_id, :member_host]
tags: [:cluster_name, :group, :partition, :topic]
) )
] ]
) )
end end
@doc false @doc false
def group_sum_lag({host, _port}, consumer_lags) do def group_sum_lag({host, _port}, cunsumer_offsets) do
Enum.each(consumer_lags, fn {group_name, lag} -> Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
lag = elem(consumer_offset.lag, 1)
:telemetry.execute( :telemetry.execute(
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag], [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
%{ %{lag: lag},
lag: lag
},
%{ %{
cluster_name: host, cluster_name: host,
group: group_name, group: consumer_offset.consumer_group,
topic: [] topic: consumer_offset.topic,
consumer_id: consumer_offset.consumer_id,
member_host: consumer_offset.member_host
} }
) )
end) end)
end end
@doc false @doc false
def group_lag_per_partition({host, _port}, consumer_lags) do def group_lag_per_partition({host, _port}, consumer_offsets) do
Enum.each(consumer_lags, fn {group_name, lags} -> Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
Enum.each(lags, fn {partition, lag} -> Enum.each(consumer_offset.lag, fn {partition, lag} ->
:telemetry.execute( :telemetry.execute(
[@kafka_event, :consumergroup, :group, :lag], [@kafka_event, :consumergroup, :group, :lag],
%{ %{lag: lag},
lag: lag
},
%{ %{
cluster_name: host, cluster_name: host,
group: group_name, group: consumer_offset.consumer_group,
partition: partition, partition: partition,
topic: [] topic: consumer_offset.topic,
consumer_id: consumer_offset.consumer_id,
member_host: consumer_offset.member_host
} }
) )
end) end)

View File

@@ -37,12 +37,13 @@ defmodule KafkaexLagExporter.MixProject do
{:telemetry_poller, "~> 1.0"}, {:telemetry_poller, "~> 1.0"},
{:jason, "~> 1.4.0"}, {:jason, "~> 1.4.0"},
{:plug_cowboy, "~> 2.6.1"}, {:plug_cowboy, "~> 2.6.1"},
{:brod, "~> 3.17.0"},
{:prom_ex, "~> 1.8.0"},
{:telemetry, "~> 1.2"},
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false}, {:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false}, {:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:patch, "~> 0.13.0", only: :test}, {:patch, "~> 0.13.0", only: :test},
{:brod, "~> 3.17.0"}, {:mix_test_watch, "~> 1.2.0", only: [:dev, :test], runtime: false}
{:prom_ex, "~> 1.8.0"},
{:telemetry, "~> 1.2"}
] ]
end end
end end

View File

@@ -16,6 +16,7 @@
"kafka_protocol": {:hex, :kafka_protocol, "4.1.5", "d15e64994a8ca99716ab47db4132614359ac1bfa56d6c5b4341fdc1aa4041518", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "c956c9357fef493b7072a35d0c3e2be02aa5186c804a412d29e62423bb15e5d9"}, "kafka_protocol": {:hex, :kafka_protocol, "4.1.5", "d15e64994a8ca99716ab47db4132614359ac1bfa56d6c5b4341fdc1aa4041518", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "c956c9357fef493b7072a35d0c3e2be02aa5186c804a412d29e62423bb15e5d9"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"}, "mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"},
"mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"}, "octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},

View File

@@ -1,10 +1,19 @@
defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
use ExUnit.Case, async: true use ExUnit.Case
use Patch use Patch
alias KafkaexLagExporter.ConsumerOffset
@test_consumer_group_name1 "test_consumer_1" @test_consumer_group_name1 "test_consumer_1"
@test_consumer_group_name2 "test_consumer_2" @test_consumer_group_name2 "test_consumer_2"
@test_lags [{0, 23}, {1, 42}, {2, 666}] @test_lags1 [{0, 23}, {1, 42}, {2, 666}]
@test_lags2 [{0, 1}, {1, 2}, {2, 3}]
@test_topic1 "test_topic_1"
@test_topic2 "test_topic_2"
@test_topic3 "test_topic_3"
@test_consumer_id1 "test_consumer_id1"
@test_consumer_id2 "test_consumer_id2"
@test_member_host "127.0.0.1"
setup do setup do
patch( patch(
@@ -13,15 +22,16 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
fn _ -> [@test_consumer_group_name1, @test_consumer_group_name2] end fn _ -> [@test_consumer_group_name1, @test_consumer_group_name2] end
) )
patch(KafkaexLagExporter.KafkaUtils, :lag, fn _, _, _ -> @test_lags end) patch(KafkaexLagExporter.KafkaUtils, :lag, &lag(&1, &2, &3))
patch( patch(
KafkaexLagExporter.KafkaUtils, KafkaexLagExporter.KafkaUtils,
:topic_names_for_consumer_groups, :get_consumer_group_info,
fn _, _, _ -> fn _, _, _ ->
[ [
{@test_consumer_group_name1, ["test_topic_1", "test_topic_2"]}, {@test_consumer_group_name1, [@test_topic1, @test_topic2], @test_consumer_id1,
{@test_consumer_group_name2, ["test_topic_3"]} @test_member_host},
{@test_consumer_group_name2, [@test_topic3], @test_consumer_id2, @test_member_host}
] ]
end end
) )
@@ -35,13 +45,54 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint) %{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
assert sum == [ assert sum == [
{@test_consumer_group_name1, 1462}, %ConsumerOffset{
{@test_consumer_group_name2, 731} consumer_group: @test_consumer_group_name1,
topic: @test_topic1,
lag: {0, 731},
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic2,
lag: {0, 6},
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name2,
topic: @test_topic3,
lag: {0, 6},
consumer_id: @test_consumer_id2,
member_host: @test_member_host
}
] ]
assert lags == [ assert lags == [
{@test_consumer_group_name1, @test_lags ++ @test_lags}, %ConsumerOffset{
{@test_consumer_group_name2, @test_lags} consumer_group: @test_consumer_group_name1,
topic: @test_topic1,
lag: @test_lags1,
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic2,
lag: @test_lags2,
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name2,
topic: @test_topic3,
lag: @test_lags2,
consumer_id: @test_consumer_id2,
member_host: @test_member_host
}
] ]
end end
defp lag(@test_topic1, _, _), do: @test_lags1
defp lag(_, _, _), do: @test_lags2
end end

104
test/kafka_utils_test.exs Normal file
View File

@@ -0,0 +1,104 @@
defmodule KafkaexLagExporter.KafkaUtils.Test do
use ExUnit.Case
use Patch
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
alias KafkaexLagExporter.KafkaUtils
@test_endpoint "test_host:1234"
@test_consumer_group "test-consumer-group"
@test_sock_opts [ssl: []]
setup do
patch(
KafkaUtils,
:connection,
fn _ -> {@test_endpoint, @test_sock_opts} end
)
:ok
end
describe "resolve_offsets" do
setup do
client = :client2
topic = "test-topic"
patch(
KafkaWrapper,
:get_partitions_count,
fn _, _ -> {:ok, 3} end
)
patch(
KafkaWrapper,
:resolve_offset,
fn _, _, _, _, _ -> {:ok, 42} end
)
offsets = KafkaUtils.resolve_offsets(topic, :earliest, client)
[client: client, offsets: offsets, topic: topic]
end
test "should get partition count", context do
expected_topic = context[:topic]
expected_client = context[:client]
assert_called(KafkaWrapper.get_partitions_count(expected_client, expected_topic))
end
test "should resolve offset per partition", context do
expected_topic = context[:topic]
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 0, :earliest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 1, :earliest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 2, :earliest, @test_sock_opts)
)
end
test "should return offsets", context do
assert context[:offsets] == [{0, 42}, {1, 42}, {2, 42}]
end
end
describe "fetch_committed_offsets" do
setup do
partition_info1 = %{partition_index: 0, committed_offset: 23}
partition_info2 = %{partition_index: 1, committed_offset: 42}
patch(
KafkaWrapper,
:fetch_committed_offsets,
fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end
)
offsets = KafkaUtils.fetch_committed_offsets("test-topic", @test_consumer_group, :test_atom)
[offsets: offsets]
end
test "should get the committed offsets from KafkaWrapper" do
assert_called(
KafkaWrapper.fetch_committed_offsets(
@test_endpoint,
@test_sock_opts,
@test_consumer_group
)
)
end
test "should return offsets", context do
assert context[:offsets] == [{0, 23}, {1, 42}]
end
end
end

View File

@@ -1,6 +1,5 @@
defmodule KafkaexLagExporterTopicNameParserTest do defmodule KafkaexLagExporterTopicNameParserTest do
use ExUnit.Case use ExUnit.Case, async: true
doctest KafkaexLagExporter.TopicNameParser
test "should parse single topic" do test "should parse single topic" do
test_member_assignment = test_member_assignment =