Compare commits
4 Commits
7b40cbedd1
...
f09ad58fb5
| Author | SHA1 | Date | |
|---|---|---|---|
| f09ad58fb5 | |||
| 3d835480d1 | |||
| ada8f12309 | |||
| ac5280acfd |
@@ -1,14 +1,18 @@
|
||||
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
|
||||
|
||||
defmodule KafkaexLagExporter.KafkaUtils do
|
||||
@behaviour KafkaexLagExporter.KafkaUtils.Behaviour
|
||||
|
||||
@moduledoc "Utility functions for dealing with Kafka"
|
||||
|
||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||
|
||||
require Logger
|
||||
|
||||
@default_client :client1
|
||||
|
||||
@type endpoint() :: {host :: atom(), port :: non_neg_integer()}
|
||||
|
||||
def connection, do: connection(@default_client)
|
||||
@spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
|
||||
@impl true
|
||||
def connection(client) do
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
config = clients[client]
|
||||
@@ -27,31 +31,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
{endpoints, sock_opts}
|
||||
end
|
||||
|
||||
@spec resolve_offsets(binary(), :earliest | :latest, atom()) ::
|
||||
list({non_neg_integer(), integer()})
|
||||
@impl true
|
||||
def resolve_offsets(topic, type, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, partitions_count} = :brod.get_partitions_count(client, topic)
|
||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||
|
||||
for i <- Range.new(0, partitions_count - 1),
|
||||
{:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do
|
||||
{:ok, offset} =
|
||||
KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do
|
||||
{i, offset}
|
||||
end
|
||||
end
|
||||
|
||||
@spec fetch_committed_offsets(binary(), binary(), atom()) ::
|
||||
{non_neg_integer(), non_neg_integer()}
|
||||
@impl true
|
||||
def fetch_committed_offsets(_topic, consumer_group, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
{:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
for r <- response,
|
||||
pr <- r[:partitions],
|
||||
do: {pr[:partition_index], pr[:committed_offset]}
|
||||
end
|
||||
|
||||
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()})
|
||||
@impl true
|
||||
def lag(topic, consumer_group, client) do
|
||||
offsets =
|
||||
resolve_offsets(topic, :latest, client)
|
||||
@@ -66,10 +70,37 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
end
|
||||
end
|
||||
|
||||
@spec lag_total(binary(), binary(), atom()) :: non_neg_integer()
|
||||
@impl true
|
||||
def lag_total(topic, consumer_group, client) do
|
||||
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
|
||||
acc -> acc + recs
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def get_consumer_group_names({host, port}) do
|
||||
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
|
||||
|
||||
groups
|
||||
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end)
|
||||
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do
|
||||
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|
||||
|> get_topic_names_for_consumer_groups
|
||||
end
|
||||
|
||||
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
|
||||
group_descriptions
|
||||
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end)
|
||||
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end)
|
||||
end
|
||||
|
||||
defp get_topic_names(members) do
|
||||
Enum.flat_map(members, fn member ->
|
||||
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment)
|
||||
end)
|
||||
end
|
||||
end
|
||||
@@ -38,7 +38,7 @@ defmodule KafkaexLagExporter.Metrics do
|
||||
|
||||
@doc false
|
||||
def group_sum_lag({host, _port}, consumer_lags) do
|
||||
Enum.each(consumer_lags, fn [group_name, lag] ->
|
||||
Enum.each(consumer_lags, fn {group_name, lag} ->
|
||||
:telemetry.execute(
|
||||
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||
%{
|
||||
@@ -55,7 +55,7 @@ defmodule KafkaexLagExporter.Metrics do
|
||||
|
||||
@doc false
|
||||
def group_lag_per_partition({host, _port}, consumer_lags) do
|
||||
Enum.each(consumer_lags, fn [group_name, lags] ->
|
||||
Enum.each(consumer_lags, fn {group_name, lags} ->
|
||||
Enum.each(lags, fn {partition, lag} ->
|
||||
:telemetry.execute(
|
||||
[@kafka_event, :consumergroup, :group, :lag],
|
||||
|
||||
@@ -1,56 +1,5 @@
|
||||
defmodule KafkaexLagExporter.PromEx do
|
||||
@moduledoc """
|
||||
Be sure to add the following to finish setting up PromEx:
|
||||
|
||||
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
|
||||
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
|
||||
more details regarding configuring PromEx:
|
||||
```
|
||||
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
|
||||
disabled: false,
|
||||
manual_metrics_start_delay: :no_delay,
|
||||
drop_metrics_groups: [],
|
||||
grafana: :disabled,
|
||||
metrics_server: :disabled
|
||||
```
|
||||
|
||||
2. Add this module to your application supervision tree. It should be one of the first
|
||||
things that is started so that no Telemetry events are missed. For example, if PromEx
|
||||
is started after your Repo module, you will miss Ecto's init events and the dashboards
|
||||
will be missing some data points:
|
||||
```
|
||||
def start(_type, _args) do
|
||||
children = [
|
||||
KafkaexLagExporter.PromEx,
|
||||
|
||||
...
|
||||
]
|
||||
|
||||
...
|
||||
end
|
||||
```
|
||||
|
||||
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
|
||||
server using the `:metrics_server` config options). Be sure to put this plug before
|
||||
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
|
||||
endpoint create their own metrics and logs which can pollute your logs/metrics given
|
||||
that Prometheus will scrape at a regular interval and that can get noisy:
|
||||
```
|
||||
defmodule KafkaexLagExporterWeb.Endpoint do
|
||||
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
|
||||
|
||||
...
|
||||
|
||||
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
|
||||
|
||||
...
|
||||
end
|
||||
```
|
||||
|
||||
4. Update the list of plugins in the `plugins/0` function return list to reflect your
|
||||
application's dependencies. Also update the list of dashboards that are to be uploaded
|
||||
to Grafana in the `dashboards/0` function.
|
||||
"""
|
||||
@moduledoc false
|
||||
|
||||
use PromEx, otp_app: :kafkaex_lag_exporter
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ defmodule KafkaexLagExporter.TopicNameParser do
|
||||
|
||||
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/
|
||||
|
||||
@spec parse_topic_names(binary) :: list(binary)
|
||||
def parse_topic_names(member_assignment) do
|
||||
member_assignment
|
||||
|> String.chunk(:printable)
|
||||
|
||||
Reference in New Issue
Block a user