diff --git a/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex b/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex index bafa2b7..1ecbcca 100644 --- a/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex +++ b/lib/kafkaex_lag_exporter/behaviours/kafka_utils_behaviour.ex @@ -3,16 +3,6 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do @callback connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()} - @callback resolve_offsets(binary, :earliest | :latest, atom) :: - list({non_neg_integer, integer}) - - @callback fetch_committed_offsets(binary, binary, atom) :: - list({non_neg_integer, non_neg_integer}) - - @callback lag(binary, binary, atom) :: list({non_neg_integer, integer}) - - @callback lag_total(binary, binary, atom) :: non_neg_integer - @callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary) @callback get_consumer_group_info( @@ -21,23 +11,20 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do list(binary) ) :: list({consumer_group :: binary, topics :: list(binary)}) + @callback lag(binary, binary, atom) :: list({non_neg_integer, integer}) + def connection(client), do: impl().connection(client) - def resolve_offsets(topic, type, client), do: impl().resolve_offsets(topic, type, client) - - def fetch_committed_offsets(topic, consumer_group, client), - do: impl().fetch_committed_offsets(topic, consumer_group, client) - - def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client) - - def lag_total(topic, consumer_group, client), - do: impl().lag_total(topic, consumer_group, client) - - def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port}) + def get_consumer_group_names(endpoint), do: impl().get_consumer_group_names(endpoint) def get_consumer_group_info(endpoint, list, consumer_group_names), do: impl().get_consumer_group_info(endpoint, list, consumer_group_names) + def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client) + + def fetch_committed_offsets(topic, consumer_group, client), + do: impl().fetch_committed_offsets(topic, consumer_group, client) + defp impl, do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils) end diff --git a/lib/kafkaex_lag_exporter/kafka_utils.ex b/lib/kafkaex_lag_exporter/kafka_utils.ex index fed79ed..e8be8af 100644 --- a/lib/kafkaex_lag_exporter/kafka_utils.ex +++ b/lib/kafkaex_lag_exporter/kafka_utils.ex @@ -32,54 +32,8 @@ defmodule KafkaexLagExporter.KafkaUtils do end @impl true - def resolve_offsets(topic, type, client) do - {endpoints, sock_opts} = connection(client) - - {:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic) - - for i <- Range.new(0, partitions_count - 1), - {:ok, offset} = - KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do - {i, offset} - end - end - - @impl true - def fetch_committed_offsets(_topic, consumer_group, client) do - {endpoints, sock_opts} = connection(client) - - {:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group) - - for r <- response, - pr <- r[:partitions], - do: {pr[:partition_index], pr[:committed_offset]} - end - - @impl true - def lag(topic, consumer_group, client) do - offsets = - resolve_offsets(topic, :latest, client) - |> Enum.sort_by(fn {key, _value} -> key end) - - committed_offsets = - fetch_committed_offsets(topic, consumer_group, client) - |> Enum.sort_by(fn {key, _value} -> key end) - - for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do - {part, current - committed} - end - end - - @impl true - def lag_total(topic, consumer_group, client) do - for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do - acc -> acc + recs - end - end - - @impl true - def get_consumer_group_names({host, port}) do - [{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], []) + def get_consumer_group_names(endpoint) do + [{_, groups} | _] = KafkaWrapper.list_all_groups([endpoint], []) groups |> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end) @@ -93,12 +47,52 @@ defmodule KafkaexLagExporter.KafkaUtils do group_descriptions |> Enum.flat_map(fn %{group_id: consumer_group, members: members} -> get_member_info(members) - |> Enum.map(fn {topic_names, consumer_id, member_host} -> - {consumer_group, topic_names, consumer_id, member_host} + |> Enum.map(fn {topics, consumer_id, member_host} -> + {consumer_group, topics, consumer_id, member_host} end) end) end + @impl true + def lag(topic, consumer_group, client) do + offsets = + resolve_offsets(topic, client) + |> Enum.sort_by(fn {key, _value} -> key end) + + committed_offsets = + fetch_committed_offsets(topic, consumer_group, client) + |> Enum.sort_by(fn {key, _value} -> key end) + + for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do + {part, current - committed} + end + end + + @spec resolve_offsets(binary, atom) :: list({non_neg_integer, integer}) + defp resolve_offsets(topic, client) do + {endpoints, sock_opts} = connection(client) + + {:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic) + + for i <- Range.new(0, partitions_count - 1), + {:ok, offset} = + KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do + {i, offset} + end + end + + @spec fetch_committed_offsets(binary, binary, atom) :: + list({non_neg_integer, non_neg_integer}) + defp fetch_committed_offsets(_topic, consumer_group, client) do + {endpoints, sock_opts} = connection(client) + + {:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group) + + for r <- response, + pr <- r[:partitions], + do: {pr[:partition_index], pr[:committed_offset]} + end + @spec get_member_info( list(%{client_host: binary, member_assignment: binary, member_id: binary}) ) :: @@ -109,8 +103,8 @@ defmodule KafkaexLagExporter.KafkaUtils do member_assignment: member_assignment, member_id: consumer_id } -> - topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment) - {topic_names, consumer_id, member_host} + topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment) + {topics, consumer_id, member_host} end) end end diff --git a/test/kafka_utils_test.exs b/test/kafka_utils_test.exs index 8fe1422..f600b9f 100644 --- a/test/kafka_utils_test.exs +++ b/test/kafka_utils_test.exs @@ -5,9 +5,13 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper alias KafkaexLagExporter.KafkaUtils - @test_endpoint "test_host:1234" - @test_consumer_group "test-consumer-group" + @test_host "test_host" + @test_port "1234" + @test_endpoint {@test_host, @test_port} @test_sock_opts [ssl: []] + @test_group_name1 "test-consumer_group1" + @test_group_name2 "test-consumer_group" + @test_topic_name "test-topic_name" setup do patch( @@ -19,60 +23,129 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do :ok end - describe "resolve_offsets" do + describe "get_consumer_group_names" do setup do - client = :client2 - topic = "test-topic" + consumer_info1 = {:a, @test_group_name1, "consumer"} + consumer_info2 = {:b, @test_group_name2, "something-other"} patch( KafkaWrapper, - :get_partitions_count, - fn _, _ -> {:ok, 3} end + :list_all_groups, + fn _, _ -> + [ + { + @test_endpoint, + [consumer_info1, consumer_info2] + } + ] + end ) - patch( - KafkaWrapper, - :resolve_offset, - fn _, _, _, _, _ -> {:ok, 42} end - ) + group_names = KafkaUtils.get_consumer_group_names(@test_endpoint) - offsets = KafkaUtils.resolve_offsets(topic, :earliest, client) - - [client: client, offsets: offsets, topic: topic] + [group_names: group_names] end - test "should get partition count", context do - expected_topic = context[:topic] - expected_client = context[:client] - - assert_called(KafkaWrapper.get_partitions_count(expected_client, expected_topic)) + test "should list all groups" do + assert_called(KafkaWrapper.list_all_groups([@test_endpoint], [])) end - test "should resolve offset per partition", context do - expected_topic = context[:topic] - - assert_called( - KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 0, :earliest, @test_sock_opts) - ) - - assert_called( - KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 1, :earliest, @test_sock_opts) - ) - - assert_called( - KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 2, :earliest, @test_sock_opts) - ) - end - - test "should return offsets", context do - assert context[:offsets] == [{0, 42}, {1, 42}, {2, 42}] + test "should filter for consumer groups and return names", context do + assert context[:group_names] === [@test_group_name1] end end - describe "fetch_committed_offsets" do + describe "get_consumer_group_info" do setup do - partition_info1 = %{partition_index: 0, committed_offset: 23} - partition_info2 = %{partition_index: 1, committed_offset: 42} + consumer_group_name = "test-group-id1" + member_assignment = "test_member_assignment" + member_assignment = "test_member_assignment" + consumer_id = "test_consumer_id" + member_host = "test_member_host" + + patch( + KafkaWrapper, + :describe_groups, + fn _, _, _ -> + { + :ok, + [ + %{ + group_id: consumer_group_name, + members: [ + %{ + client_host: member_host, + member_assignment: member_assignment, + member_id: consumer_id + } + ] + } + ] + } + end + ) + + patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end) + + group_info = + KafkaUtils.get_consumer_group_info(@test_endpoint, [], [ + @test_group_name1, + @test_group_name2 + ]) + + [ + member_host: member_host, + group_info: group_info, + consumer_group_name: consumer_group_name, + member_assignment: member_assignment, + consumer_id: consumer_id + ] + end + + test "should describe groups" do + assert_called( + KafkaWrapper.describe_groups(@test_endpoint, [], [@test_group_name1, @test_group_name2]) + ) + end + + test "should parse topic names", context do + expected_member_assignment = context[:expected_member_assignment] + + assert_called( + KafkaexLagExporter.TopicNameParser.parse_topic_names(expected_member_assignment) + ) + end + + test "should filter for consumer groups and return names", context do + expected_consumer_group_name = context[:consumer_group_name] + expected_consumer_id = context[:consumer_id] + expected_member_host = context[:member_host] + + assert context[:group_info] === [ + { + expected_consumer_group_name, + [@test_topic_name], + expected_consumer_id, + expected_member_host + } + ] + end + end + + describe "lag" do + setup do + consumer_group = "test-consumer-group" + client = :client1 + committed_offset1 = 23 + committed_offset2 = 42 + resolved_offset = 50 + + partition_info1 = %{partition_index: 0, committed_offset: committed_offset1} + partition_info2 = %{partition_index: 1, committed_offset: committed_offset2} + + patch(KafkaWrapper, :get_partitions_count, fn _, _ -> {:ok, 2} end) + + patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end) patch( KafkaWrapper, @@ -82,23 +155,60 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do end ) - offsets = KafkaUtils.fetch_committed_offsets("test-topic", @test_consumer_group, :test_atom) + lag = KafkaUtils.lag(@test_topic_name, consumer_group, client) - [offsets: offsets] + [ + lag: lag, + consumer_group: consumer_group, + client: client, + committed_offset1: committed_offset1, + committed_offset2: committed_offset2, + resolved_offset: resolved_offset + ] end - test "should get the committed offsets from KafkaWrapper" do + test "should start connection", context do + expected_client = context[:client] + + assert_called(KafkaUtils.connection(expected_client)) + end + + test "should get partition count", context do + expected_client = context[:client] + + assert_called(KafkaWrapper.get_partitions_count(expected_client, @test_topic_name)) + end + + test "should resolve offsets for each partition" do + assert_called( + KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 0, :latest, @test_sock_opts) + ) + + assert_called( + KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 1, :latest, @test_sock_opts) + ) + end + + test "should fetch committed offsets", context do + expected_consumer_group = context[:consumer_group] + assert_called( KafkaWrapper.fetch_committed_offsets( @test_endpoint, @test_sock_opts, - @test_consumer_group + expected_consumer_group ) ) end - test "should return offsets", context do - assert context[:offsets] == [{0, 23}, {1, 42}] + test "should return calculated lag per partition", context do + expected_lag_partition_1 = context[:resolved_offset] - context[:committed_offset1] + expected_lag_partition_2 = context[:resolved_offset] - context[:committed_offset2] + + assert context[:lag] === [ + {0, expected_lag_partition_1}, + {1, expected_lag_partition_2} + ] end end end