Compare commits

...

4 Commits

Author SHA1 Message Date
f09ad58fb5 Fix type mismatch 2024-03-27 22:26:02 +01:00
3d835480d1 Use behaviour for KafkaUtils 2024-03-27 22:25:29 +01:00
ada8f12309 Remove implemented doc comment 2024-03-27 22:24:06 +01:00
ac5280acfd Add type spec 2024-03-27 22:23:12 +01:00
4 changed files with 47 additions and 66 deletions

View File

@@ -1,14 +1,18 @@
# source code taken from https://github.com/reachfh/brod_group_subscriber_example # source code taken from https://github.com/reachfh/brod_group_subscriber_example
defmodule KafkaexLagExporter.KafkaUtils do defmodule KafkaexLagExporter.KafkaUtils do
@behaviour KafkaexLagExporter.KafkaUtils.Behaviour
@moduledoc "Utility functions for dealing with Kafka" @moduledoc "Utility functions for dealing with Kafka"
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
require Logger
@default_client :client1 @default_client :client1
@type endpoint() :: {host :: atom(), port :: non_neg_integer()}
def connection, do: connection(@default_client) def connection, do: connection(@default_client)
@spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()} @impl true
def connection(client) do def connection(client) do
clients = Application.get_env(:brod, :clients) clients = Application.get_env(:brod, :clients)
config = clients[client] config = clients[client]
@@ -27,31 +31,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
{endpoints, sock_opts} {endpoints, sock_opts}
end end
@spec resolve_offsets(binary(), :earliest | :latest, atom()) :: @impl true
list({non_neg_integer(), integer()})
def resolve_offsets(topic, type, client) do def resolve_offsets(topic, type, client) do
{endpoints, sock_opts} = connection(client) {endpoints, sock_opts} = connection(client)
{:ok, partitions_count} = :brod.get_partitions_count(client, topic) {:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1), for i <- Range.new(0, partitions_count - 1),
{:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do {:ok, offset} =
KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do
{i, offset} {i, offset}
end end
end end
@spec fetch_committed_offsets(binary(), binary(), atom()) :: @impl true
{non_neg_integer(), non_neg_integer()}
def fetch_committed_offsets(_topic, consumer_group, client) do def fetch_committed_offsets(_topic, consumer_group, client) do
{endpoints, sock_opts} = connection(client) {endpoints, sock_opts} = connection(client)
{:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response, for r <- response,
pr <- r[:partitions], pr <- r[:partitions],
do: {pr[:partition_index], pr[:committed_offset]} do: {pr[:partition_index], pr[:committed_offset]}
end end
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()}) @impl true
def lag(topic, consumer_group, client) do def lag(topic, consumer_group, client) do
offsets = offsets =
resolve_offsets(topic, :latest, client) resolve_offsets(topic, :latest, client)
@@ -66,10 +70,37 @@ defmodule KafkaexLagExporter.KafkaUtils do
end end
end end
@spec lag_total(binary(), binary(), atom()) :: non_neg_integer() @impl true
def lag_total(topic, consumer_group, client) do def lag_total(topic, consumer_group, client) do
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
acc -> acc + recs acc -> acc + recs
end end
end end
@impl true
def get_consumer_group_names({host, port}) do
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
groups
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end)
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end)
end
@impl true
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|> get_topic_names_for_consumer_groups
end
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
group_descriptions
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end)
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end)
end
defp get_topic_names(members) do
Enum.flat_map(members, fn member ->
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment)
end)
end
end end

View File

@@ -38,7 +38,7 @@ defmodule KafkaexLagExporter.Metrics do
@doc false @doc false
def group_sum_lag({host, _port}, consumer_lags) do def group_sum_lag({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn [group_name, lag] -> Enum.each(consumer_lags, fn {group_name, lag} ->
:telemetry.execute( :telemetry.execute(
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag], [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
%{ %{
@@ -55,7 +55,7 @@ defmodule KafkaexLagExporter.Metrics do
@doc false @doc false
def group_lag_per_partition({host, _port}, consumer_lags) do def group_lag_per_partition({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn [group_name, lags] -> Enum.each(consumer_lags, fn {group_name, lags} ->
Enum.each(lags, fn {partition, lag} -> Enum.each(lags, fn {partition, lag} ->
:telemetry.execute( :telemetry.execute(
[@kafka_event, :consumergroup, :group, :lag], [@kafka_event, :consumergroup, :group, :lag],

View File

@@ -1,56 +1,5 @@
defmodule KafkaexLagExporter.PromEx do defmodule KafkaexLagExporter.PromEx do
@moduledoc """ @moduledoc false
Be sure to add the following to finish setting up PromEx:
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
more details regarding configuring PromEx:
```
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
disabled: false,
manual_metrics_start_delay: :no_delay,
drop_metrics_groups: [],
grafana: :disabled,
metrics_server: :disabled
```
2. Add this module to your application supervision tree. It should be one of the first
things that is started so that no Telemetry events are missed. For example, if PromEx
is started after your Repo module, you will miss Ecto's init events and the dashboards
will be missing some data points:
```
def start(_type, _args) do
children = [
KafkaexLagExporter.PromEx,
...
]
...
end
```
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
server using the `:metrics_server` config options). Be sure to put this plug before
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
endpoint create their own metrics and logs which can pollute your logs/metrics given
that Prometheus will scrape at a regular interval and that can get noisy:
```
defmodule KafkaexLagExporterWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
...
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
...
end
```
4. Update the list of plugins in the `plugins/0` function return list to reflect your
application's dependencies. Also update the list of dashboards that are to be uploaded
to Grafana in the `dashboards/0` function.
"""
use PromEx, otp_app: :kafkaex_lag_exporter use PromEx, otp_app: :kafkaex_lag_exporter

View File

@@ -3,6 +3,7 @@ defmodule KafkaexLagExporter.TopicNameParser do
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/ @invalid_topic_characters ~r/[^[:alnum:]\-\._]/
@spec parse_topic_names(binary) :: list(binary)
def parse_topic_names(member_assignment) do def parse_topic_names(member_assignment) do
member_assignment member_assignment
|> String.chunk(:printable) |> String.chunk(:printable)