Compare commits
4 Commits
7b40cbedd1
...
f09ad58fb5
| Author | SHA1 | Date | |
|---|---|---|---|
| f09ad58fb5 | |||
| 3d835480d1 | |||
| ada8f12309 | |||
| ac5280acfd |
@@ -1,14 +1,18 @@
|
|||||||
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
|
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
|
||||||
|
|
||||||
defmodule KafkaexLagExporter.KafkaUtils do
|
defmodule KafkaexLagExporter.KafkaUtils do
|
||||||
|
@behaviour KafkaexLagExporter.KafkaUtils.Behaviour
|
||||||
|
|
||||||
@moduledoc "Utility functions for dealing with Kafka"
|
@moduledoc "Utility functions for dealing with Kafka"
|
||||||
|
|
||||||
|
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
@default_client :client1
|
@default_client :client1
|
||||||
|
|
||||||
@type endpoint() :: {host :: atom(), port :: non_neg_integer()}
|
|
||||||
|
|
||||||
def connection, do: connection(@default_client)
|
def connection, do: connection(@default_client)
|
||||||
@spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
|
@impl true
|
||||||
def connection(client) do
|
def connection(client) do
|
||||||
clients = Application.get_env(:brod, :clients)
|
clients = Application.get_env(:brod, :clients)
|
||||||
config = clients[client]
|
config = clients[client]
|
||||||
@@ -27,31 +31,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
{endpoints, sock_opts}
|
{endpoints, sock_opts}
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec resolve_offsets(binary(), :earliest | :latest, atom()) ::
|
@impl true
|
||||||
list({non_neg_integer(), integer()})
|
|
||||||
def resolve_offsets(topic, type, client) do
|
def resolve_offsets(topic, type, client) do
|
||||||
{endpoints, sock_opts} = connection(client)
|
{endpoints, sock_opts} = connection(client)
|
||||||
|
|
||||||
{:ok, partitions_count} = :brod.get_partitions_count(client, topic)
|
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||||
|
|
||||||
for i <- Range.new(0, partitions_count - 1),
|
for i <- Range.new(0, partitions_count - 1),
|
||||||
{:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do
|
{:ok, offset} =
|
||||||
|
KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do
|
||||||
{i, offset}
|
{i, offset}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec fetch_committed_offsets(binary(), binary(), atom()) ::
|
@impl true
|
||||||
{non_neg_integer(), non_neg_integer()}
|
|
||||||
def fetch_committed_offsets(_topic, consumer_group, client) do
|
def fetch_committed_offsets(_topic, consumer_group, client) do
|
||||||
{endpoints, sock_opts} = connection(client)
|
{endpoints, sock_opts} = connection(client)
|
||||||
{:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
|
||||||
|
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||||
|
|
||||||
for r <- response,
|
for r <- response,
|
||||||
pr <- r[:partitions],
|
pr <- r[:partitions],
|
||||||
do: {pr[:partition_index], pr[:committed_offset]}
|
do: {pr[:partition_index], pr[:committed_offset]}
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()})
|
@impl true
|
||||||
def lag(topic, consumer_group, client) do
|
def lag(topic, consumer_group, client) do
|
||||||
offsets =
|
offsets =
|
||||||
resolve_offsets(topic, :latest, client)
|
resolve_offsets(topic, :latest, client)
|
||||||
@@ -66,10 +70,37 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec lag_total(binary(), binary(), atom()) :: non_neg_integer()
|
@impl true
|
||||||
def lag_total(topic, consumer_group, client) do
|
def lag_total(topic, consumer_group, client) do
|
||||||
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
|
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
|
||||||
acc -> acc + recs
|
acc -> acc + recs
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def get_consumer_group_names({host, port}) do
|
||||||
|
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
|
||||||
|
|
||||||
|
groups
|
||||||
|
|> Enum.filter(fn {_, _, protocol} -> protocol == "consumer" end)
|
||||||
|
|> Enum.map(fn {_, group_name, "consumer"} -> group_name end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def topic_names_for_consumer_groups(endpoint, list \\ [], consumer_group_names) do
|
||||||
|
KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|
||||||
|
|> get_topic_names_for_consumer_groups
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_topic_names_for_consumer_groups({:ok, group_descriptions}) do
|
||||||
|
group_descriptions
|
||||||
|
|> Enum.map(fn %{group_id: consumer_group, members: members} -> [consumer_group, members] end)
|
||||||
|
|> Enum.map(fn [consumer_group, members] -> {consumer_group, get_topic_names(members)} end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_topic_names(members) do
|
||||||
|
Enum.flat_map(members, fn member ->
|
||||||
|
KafkaexLagExporter.TopicNameParser.parse_topic_names(member.member_assignment)
|
||||||
|
end)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
@@ -38,7 +38,7 @@ defmodule KafkaexLagExporter.Metrics do
|
|||||||
|
|
||||||
@doc false
|
@doc false
|
||||||
def group_sum_lag({host, _port}, consumer_lags) do
|
def group_sum_lag({host, _port}, consumer_lags) do
|
||||||
Enum.each(consumer_lags, fn [group_name, lag] ->
|
Enum.each(consumer_lags, fn {group_name, lag} ->
|
||||||
:telemetry.execute(
|
:telemetry.execute(
|
||||||
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||||
%{
|
%{
|
||||||
@@ -55,7 +55,7 @@ defmodule KafkaexLagExporter.Metrics do
|
|||||||
|
|
||||||
@doc false
|
@doc false
|
||||||
def group_lag_per_partition({host, _port}, consumer_lags) do
|
def group_lag_per_partition({host, _port}, consumer_lags) do
|
||||||
Enum.each(consumer_lags, fn [group_name, lags] ->
|
Enum.each(consumer_lags, fn {group_name, lags} ->
|
||||||
Enum.each(lags, fn {partition, lag} ->
|
Enum.each(lags, fn {partition, lag} ->
|
||||||
:telemetry.execute(
|
:telemetry.execute(
|
||||||
[@kafka_event, :consumergroup, :group, :lag],
|
[@kafka_event, :consumergroup, :group, :lag],
|
||||||
|
|||||||
@@ -1,56 +1,5 @@
|
|||||||
defmodule KafkaexLagExporter.PromEx do
|
defmodule KafkaexLagExporter.PromEx do
|
||||||
@moduledoc """
|
@moduledoc false
|
||||||
Be sure to add the following to finish setting up PromEx:
|
|
||||||
|
|
||||||
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
|
|
||||||
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
|
|
||||||
more details regarding configuring PromEx:
|
|
||||||
```
|
|
||||||
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
|
|
||||||
disabled: false,
|
|
||||||
manual_metrics_start_delay: :no_delay,
|
|
||||||
drop_metrics_groups: [],
|
|
||||||
grafana: :disabled,
|
|
||||||
metrics_server: :disabled
|
|
||||||
```
|
|
||||||
|
|
||||||
2. Add this module to your application supervision tree. It should be one of the first
|
|
||||||
things that is started so that no Telemetry events are missed. For example, if PromEx
|
|
||||||
is started after your Repo module, you will miss Ecto's init events and the dashboards
|
|
||||||
will be missing some data points:
|
|
||||||
```
|
|
||||||
def start(_type, _args) do
|
|
||||||
children = [
|
|
||||||
KafkaexLagExporter.PromEx,
|
|
||||||
|
|
||||||
...
|
|
||||||
]
|
|
||||||
|
|
||||||
...
|
|
||||||
end
|
|
||||||
```
|
|
||||||
|
|
||||||
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
|
|
||||||
server using the `:metrics_server` config options). Be sure to put this plug before
|
|
||||||
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
|
|
||||||
endpoint create their own metrics and logs which can pollute your logs/metrics given
|
|
||||||
that Prometheus will scrape at a regular interval and that can get noisy:
|
|
||||||
```
|
|
||||||
defmodule KafkaexLagExporterWeb.Endpoint do
|
|
||||||
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
|
|
||||||
|
|
||||||
...
|
|
||||||
|
|
||||||
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
|
|
||||||
|
|
||||||
...
|
|
||||||
end
|
|
||||||
```
|
|
||||||
|
|
||||||
4. Update the list of plugins in the `plugins/0` function return list to reflect your
|
|
||||||
application's dependencies. Also update the list of dashboards that are to be uploaded
|
|
||||||
to Grafana in the `dashboards/0` function.
|
|
||||||
"""
|
|
||||||
|
|
||||||
use PromEx, otp_app: :kafkaex_lag_exporter
|
use PromEx, otp_app: :kafkaex_lag_exporter
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ defmodule KafkaexLagExporter.TopicNameParser do
|
|||||||
|
|
||||||
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/
|
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/
|
||||||
|
|
||||||
|
@spec parse_topic_names(binary) :: list(binary)
|
||||||
def parse_topic_names(member_assignment) do
|
def parse_topic_names(member_assignment) do
|
||||||
member_assignment
|
member_assignment
|
||||||
|> String.chunk(:printable)
|
|> String.chunk(:printable)
|
||||||
|
|||||||
Reference in New Issue
Block a user