Compare commits

...

8 Commits

Author SHA1 Message Date
baed79e1ba Add more tags to metrics 2024-03-28 23:49:44 +01:00
a68a0126c8 Update test for KafkaUtils 2024-03-28 23:46:26 +01:00
1630d1dcda Remove empty module 2024-03-28 23:45:56 +01:00
4922da165c Return more data for members 2024-03-28 23:45:40 +01:00
9fbf7a98b8 Fix init cluster script 2024-03-28 23:44:25 +01:00
08b5923d52 Simplify code 2024-03-28 21:23:51 +01:00
6bf68b74ec Install test watcher 2024-03-28 19:05:59 +01:00
1cc9afbcf4 Adjust async level of tests 2024-03-28 18:41:35 +01:00
15 changed files with 320 additions and 117 deletions

View File

@@ -2,7 +2,7 @@
kind delete cluster
kind create cluster --config deployment/kind.yaml
kind create cluster --config kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m

24
kind.yaml Normal file
View File

@@ -0,0 +1,24 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f

View File

@@ -1,13 +0,0 @@
defmodule KafkaexLagExporter do
@moduledoc """
KafkaexLagExporter keeps the contexts that define your domain
and business logic.
Contexts are also responsible for managing your data, regardless
if it comes from the database, an external API or others.
"""
def hello() do
:world
end
end

View File

@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.Application do
{Phoenix.PubSub, name: KafkaexLagExporter.PubSub},
# Start the Endpoint (http/https)
KafkaexLagExporterWeb.Endpoint,
KafkaexLagExporter.ConsumerOffset
KafkaexLagExporter.ConsumerOffsetRunner
]
# See https://hexdocs.pm/elixir/Supervisor.html

View File

@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
@callback topic_names_for_consumer_groups(
@callback get_consumer_group_info(
{host :: atom, port :: non_neg_integer},
list(binary),
list(binary)
@@ -35,8 +35,8 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
def topic_names_for_consumer_groups(endpoint, list, consumer_group_names),
do: impl().topic_names_for_consumer_groups(endpoint, list, consumer_group_names)
def get_consumer_group_info(endpoint, list, consumer_group_names),
do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
defp impl,
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)

View File

@@ -1,41 +1,17 @@
defmodule KafkaexLagExporter.ConsumerOffset do
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
@moduledoc "Struct holding all relevant telemetry information of consumers"
use GenServer
@type t :: %__MODULE__{
consumer_group: binary,
topic: binary,
lag: list({partition :: non_neg_integer, lag :: non_neg_integer}),
consumer_id: binary,
member_host: binary
}
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Starting #{__MODULE__}")
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Process.send_after(self(), :tick, @interval)
{:ok, %{endpoints: endpoints}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
{:noreply, state}
end
defstruct consumer_group: "",
topic: "",
lag: [],
consumer_id: "",
member_host: ""
end

View File

@@ -1,8 +1,7 @@
defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
@moduledoc "Calculate summarized lag for each consumer group"
require Logger
alias KafkaexLagExporter.ConsumerOffset
alias KafkaexLagExporter.KafkaUtils
# TODO fix type
@@ -14,30 +13,38 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
consumer_lags =
KafkaUtils.topic_names_for_consumer_groups(
endpoint,
[],
consumer_group_names
)
|> Enum.map(fn {consumer_group, topics} ->
{consumer_group, get_lag_for_consumer(consumer_group, topics)}
end)
KafkaUtils.get_consumer_group_info(endpoint, [], consumer_group_names)
|> Enum.flat_map(&get_lag_per_topic(&1))
consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags)
%{lags: consumer_lags, sum: consumer_lag_sum}
end
defp get_lag_for_consumer(consumer_group, topics) do
topics
|> Enum.flat_map(fn topic ->
KafkaUtils.lag(topic, consumer_group, :client1)
@spec get_lag_per_topic(
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
member_host :: binary}
) :: list(ConsumerOffset.t())
defp get_lag_per_topic({consumer_group, topics, consumer_id, member_host}) do
Enum.map(topics, fn topic ->
lag = KafkaUtils.lag(topic, consumer_group, :client1)
%ConsumerOffset{
consumer_group: consumer_group,
topic: topic,
lag: lag,
consumer_id: consumer_id,
member_host: member_host
}
end)
end
@spec get_lag_per_topic(list(ConsumerOffset.t())) :: list(ConsumerOffset.t())
defp get_lag_for_consumer_sum(lags_per_consumer_group) do
lags_per_consumer_group
|> Enum.map(fn {topic, lag_per_partition} -> {topic, sum_topic_lag(lag_per_partition, 0)} end)
Enum.map(lags_per_consumer_group, fn consumer_offset ->
lag_sum = sum_topic_lag(consumer_offset.lag, 0)
%ConsumerOffset{consumer_offset | lag: {0, lag_sum}}
end)
end
defp sum_topic_lag([], acc), do: acc

View File

@@ -0,0 +1,41 @@
defmodule KafkaexLagExporter.ConsumerOffsetRunner do
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
use GenServer
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Starting #{__MODULE__}")
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Process.send_after(self(), :tick, @interval)
{:ok, %{endpoints: endpoints}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
{:noreply, state}
end
end

View File

@@ -82,25 +82,35 @@ defmodule KafkaexLagExporter.KafkaUtils do
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
groups
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end)
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end)
|> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
|> Enum.map(fn {_, group_name, _} -> group_name end)
end
@impl true
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|> get_topic_names_for_consumer_groups
end
def get_consumer_group_info(endpoint, list \\ [], consumer_group_names) do
{:ok, group_descriptions} = KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
group_descriptions
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end)
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end)
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
get_member_info(members)
|> Enum.map(fn {topic_names, consumer_id, member_host} ->
{consumer_group, topic_names, consumer_id, member_host}
end)
end)
end
defp get_topic_names(members) do
Enum.flat_map(members, fn member ->
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment)
@spec get_member_info(
list(%{client_host: binary, member_assignment: binary, member_id: binary})
) ::
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
defp get_member_info(members) do
Enum.map(members, fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topic_names, consumer_id, member_host}
end)
end
end

View File

@@ -3,6 +3,8 @@ defmodule KafkaexLagExporter.Metrics do
use PromEx.Plugin
alias KafkaexLagExporter.ConsumerOffset
require Logger
@kafka_event :kafka
@@ -21,52 +23,52 @@ defmodule KafkaexLagExporter.Metrics do
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
description: "Sum of group offset lag across topic partitions",
measurement: :lag,
# TODO: add more tags like member_host, consumer_id, client_id, ...
tags: [:cluster_name, :group, :topic]
tags: [:cluster_name, :group, :topic, :consumer_id, :member_host]
),
last_value(
[@kafka_event, :consumergroup, :group, :lag],
event_name: [@kafka_event, :consumergroup, :group, :lag],
description: "Group offset lag of a partition",
measurement: :lag,
# TODO: add more tags like member_host, consumer_id, client_id, ...
tags: [:cluster_name, :group, :partition, :topic]
tags: [:cluster_name, :group, :partition, :topic, :consumer_id, :member_host]
)
]
)
end
@doc false
def group_sum_lag({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn {group_name, lag} ->
def group_sum_lag({host, _port}, cunsumer_offsets) do
Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
lag = elem(consumer_offset.lag, 1)
:telemetry.execute(
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
%{
lag: lag
},
%{lag: lag},
%{
cluster_name: host,
group: group_name,
topic: []
group: consumer_offset.consumer_group,
topic: consumer_offset.topic,
consumer_id: consumer_offset.consumer_id,
member_host: consumer_offset.member_host
}
)
end)
end
@doc false
def group_lag_per_partition({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn {group_name, lags} ->
Enum.each(lags, fn {partition, lag} ->
def group_lag_per_partition({host, _port}, consumer_offsets) do
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
Enum.each(consumer_offset.lag, fn {partition, lag} ->
:telemetry.execute(
[@kafka_event, :consumergroup, :group, :lag],
%{
lag: lag
},
%{lag: lag},
%{
cluster_name: host,
group: group_name,
group: consumer_offset.consumer_group,
partition: partition,
topic: []
topic: consumer_offset.topic,
consumer_id: consumer_offset.consumer_id,
member_host: consumer_offset.member_host
}
)
end)

View File

@@ -37,12 +37,13 @@ defmodule KafkaexLagExporter.MixProject do
{:telemetry_poller, "~> 1.0"},
{:jason, "~> 1.4.0"},
{:plug_cowboy, "~> 2.6.1"},
{:brod, "~> 3.17.0"},
{:prom_ex, "~> 1.8.0"},
{:telemetry, "~> 1.2"},
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:patch, "~> 0.13.0", only: :test},
{:brod, "~> 3.17.0"},
{:prom_ex, "~> 1.8.0"},
{:telemetry, "~> 1.2"}
{:mix_test_watch, "~> 1.2.0", only: [:dev, :test], runtime: false}
]
end
end

View File

@@ -16,6 +16,7 @@
"kafka_protocol": {:hex, :kafka_protocol, "4.1.5", "d15e64994a8ca99716ab47db4132614359ac1bfa56d6c5b4341fdc1aa4041518", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "c956c9357fef493b7072a35d0c3e2be02aa5186c804a412d29e62423bb15e5d9"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"},
"mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},

View File

@@ -1,10 +1,19 @@
defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
use ExUnit.Case, async: true
use ExUnit.Case
use Patch
alias KafkaexLagExporter.ConsumerOffset
@test_consumer_group_name1 "test_consumer_1"
@test_consumer_group_name2 "test_consumer_2"
@test_lags [{0, 23}, {1, 42}, {2, 666}]
@test_lags1 [{0, 23}, {1, 42}, {2, 666}]
@test_lags2 [{0, 1}, {1, 2}, {2, 3}]
@test_topic1 "test_topic_1"
@test_topic2 "test_topic_2"
@test_topic3 "test_topic_3"
@test_consumer_id1 "test_consumer_id1"
@test_consumer_id2 "test_consumer_id2"
@test_member_host "127.0.0.1"
setup do
patch(
@@ -13,15 +22,16 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
fn _ -> [@test_consumer_group_name1, @test_consumer_group_name2] end
)
patch(KafkaexLagExporter.KafkaUtils, :lag, fn _, _, _ -> @test_lags end)
patch(KafkaexLagExporter.KafkaUtils, :lag, &lag(&1, &2, &3))
patch(
KafkaexLagExporter.KafkaUtils,
:topic_names_for_consumer_groups,
:get_consumer_group_info,
fn _, _, _ ->
[
{@test_consumer_group_name1, ["test_topic_1", "test_topic_2"]},
{@test_consumer_group_name2, ["test_topic_3"]}
{@test_consumer_group_name1, [@test_topic1, @test_topic2], @test_consumer_id1,
@test_member_host},
{@test_consumer_group_name2, [@test_topic3], @test_consumer_id2, @test_member_host}
]
end
)
@@ -35,13 +45,54 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
assert sum == [
{@test_consumer_group_name1, 1462},
{@test_consumer_group_name2, 731}
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic1,
lag: {0, 731},
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic2,
lag: {0, 6},
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name2,
topic: @test_topic3,
lag: {0, 6},
consumer_id: @test_consumer_id2,
member_host: @test_member_host
}
]
assert lags == [
{@test_consumer_group_name1, @test_lags ++ @test_lags},
{@test_consumer_group_name2, @test_lags}
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic1,
lag: @test_lags1,
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic2,
lag: @test_lags2,
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name2,
topic: @test_topic3,
lag: @test_lags2,
consumer_id: @test_consumer_id2,
member_host: @test_member_host
}
]
end
defp lag(@test_topic1, _, _), do: @test_lags1
defp lag(_, _, _), do: @test_lags2
end

104
test/kafka_utils_test.exs Normal file
View File

@@ -0,0 +1,104 @@
defmodule KafkaexLagExporter.KafkaUtils.Test do
use ExUnit.Case
use Patch
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
alias KafkaexLagExporter.KafkaUtils
@test_endpoint "test_host:1234"
@test_consumer_group "test-consumer-group"
@test_sock_opts [ssl: []]
setup do
patch(
KafkaUtils,
:connection,
fn _ -> {@test_endpoint, @test_sock_opts} end
)
:ok
end
describe "resolve_offsets" do
setup do
client = :client2
topic = "test-topic"
patch(
KafkaWrapper,
:get_partitions_count,
fn _, _ -> {:ok, 3} end
)
patch(
KafkaWrapper,
:resolve_offset,
fn _, _, _, _, _ -> {:ok, 42} end
)
offsets = KafkaUtils.resolve_offsets(topic, :earliest, client)
[client: client, offsets: offsets, topic: topic]
end
test "should get partition count", context do
expected_topic = context[:topic]
expected_client = context[:client]
assert_called(KafkaWrapper.get_partitions_count(expected_client, expected_topic))
end
test "should resolve offset per partition", context do
expected_topic = context[:topic]
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 0, :earliest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 1, :earliest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 2, :earliest, @test_sock_opts)
)
end
test "should return offsets", context do
assert context[:offsets] == [{0, 42}, {1, 42}, {2, 42}]
end
end
describe "fetch_committed_offsets" do
setup do
partition_info1 = %{partition_index: 0, committed_offset: 23}
partition_info2 = %{partition_index: 1, committed_offset: 42}
patch(
KafkaWrapper,
:fetch_committed_offsets,
fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end
)
offsets = KafkaUtils.fetch_committed_offsets("test-topic", @test_consumer_group, :test_atom)
[offsets: offsets]
end
test "should get the committed offsets from KafkaWrapper" do
assert_called(
KafkaWrapper.fetch_committed_offsets(
@test_endpoint,
@test_sock_opts,
@test_consumer_group
)
)
end
test "should return offsets", context do
assert context[:offsets] == [{0, 23}, {1, 42}]
end
end
end

View File

@@ -1,6 +1,5 @@
defmodule KafkaexLagExporterTopicNameParserTest do
use ExUnit.Case
doctest KafkaexLagExporter.TopicNameParser
use ExUnit.Case, async: true
test "should parse single topic" do
test_member_assignment =