Compare commits

..

4 Commits

Author SHA1 Message Date
f09ad58fb5 Fix type mismatch 2024-03-27 22:26:02 +01:00
3d835480d1 Use behaviour for KafkaUtils 2024-03-27 22:25:29 +01:00
ada8f12309 Remove implemented doc comment 2024-03-27 22:24:06 +01:00
ac5280acfd Add type spec 2024-03-27 22:23:12 +01:00
4 changed files with 47 additions and 66 deletions

View File

@@ -1,14 +1,18 @@
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
defmodule KafkaexLagExporter.KafkaUtils do
@behaviour KafkaexLagExporter.KafkaUtils.Behaviour
@moduledoc "Utility functions for dealing with Kafka"
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
require Logger
@default_client :client1
@type endpoint() :: {host :: atom(), port :: non_neg_integer()}
def connection, do: connection(@default_client)
@spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
@impl true
def connection(client) do
clients = Application.get_env(:brod, :clients)
config = clients[client]
@@ -27,31 +31,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
{endpoints, sock_opts}
end
@spec resolve_offsets(binary(), :earliest | :latest, atom()) ::
list({non_neg_integer(), integer()})
@impl true
def resolve_offsets(topic, type, client) do
{endpoints, sock_opts} = connection(client)
{:ok, partitions_count} = :brod.get_partitions_count(client, topic)
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do
{:ok, offset} =
KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do
{i, offset}
end
end
@spec fetch_committed_offsets(binary(), binary(), atom()) ::
{non_neg_integer(), non_neg_integer()}
@impl true
def fetch_committed_offsets(_topic, consumer_group, client) do
{endpoints, sock_opts} = connection(client)
{:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response,
pr <- r[:partitions],
do: {pr[:partition_index], pr[:committed_offset]}
end
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()})
@impl true
def lag(topic, consumer_group, client) do
offsets =
resolve_offsets(topic, :latest, client)
@@ -66,10 +70,37 @@ defmodule KafkaexLagExporter.KafkaUtils do
end
end
@spec lag_total(binary(), binary(), atom()) :: non_neg_integer()
@impl true
def lag_total(topic, consumer_group, client) do
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
acc -> acc + recs
end
end
@impl true
def get_consumer_group_names({host, port}) do
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
groups
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end)
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end)
end
@impl true
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|> get_topic_names_for_consumer_groups
end
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
group_descriptions
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end)
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end)
end
defp get_topic_names(members) do
Enum.flat_map(members, fn member ->
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment)
end)
end
end

View File

@@ -38,7 +38,7 @@ defmodule KafkaexLagExporter.Metrics do
@doc false
def group_sum_lag({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn [group_name, lag] ->
Enum.each(consumer_lags, fn {group_name, lag} ->
:telemetry.execute(
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
%{
@@ -55,7 +55,7 @@ defmodule KafkaexLagExporter.Metrics do
@doc false
def group_lag_per_partition({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn [group_name, lags] ->
Enum.each(consumer_lags, fn {group_name, lags} ->
Enum.each(lags, fn {partition, lag} ->
:telemetry.execute(
[@kafka_event, :consumergroup, :group, :lag],

View File

@@ -1,56 +1,5 @@
defmodule KafkaexLagExporter.PromEx do
@moduledoc """
Be sure to add the following to finish setting up PromEx:
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
more details regarding configuring PromEx:
```
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
disabled: false,
manual_metrics_start_delay: :no_delay,
drop_metrics_groups: [],
grafana: :disabled,
metrics_server: :disabled
```
2. Add this module to your application supervision tree. It should be one of the first
things that is started so that no Telemetry events are missed. For example, if PromEx
is started after your Repo module, you will miss Ecto's init events and the dashboards
will be missing some data points:
```
def start(_type, _args) do
children = [
KafkaexLagExporter.PromEx,
...
]
...
end
```
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
server using the `:metrics_server` config options). Be sure to put this plug before
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
endpoint create their own metrics and logs which can pollute your logs/metrics given
that Prometheus will scrape at a regular interval and that can get noisy:
```
defmodule KafkaexLagExporterWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
...
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
...
end
```
4. Update the list of plugins in the `plugins/0` function return list to reflect your
application's dependencies. Also update the list of dashboards that are to be uploaded
to Grafana in the `dashboards/0` function.
"""
@moduledoc false
use PromEx, otp_app: :kafkaex_lag_exporter

View File

@@ -3,6 +3,7 @@ defmodule KafkaexLagExporter.TopicNameParser do
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/
@spec parse_topic_names(binary) :: list(binary)
def parse_topic_names(member_assignment) do
member_assignment
|> String.chunk(:printable)