Compare commits

...

13 Commits

Author SHA1 Message Date
29c032cb16 Add license 2024-06-01 12:02:18 +02:00
a34e91770f Simplify test code 2024-04-16 22:33:28 +02:00
98e79b927d Update docs to include configurable interval 2024-04-16 22:21:46 +02:00
9e9676d4e4 Remove code for Tilt cluster 2024-04-16 22:16:44 +02:00
bf8389b4e0 Simplify code 2024-04-16 22:11:50 +02:00
ee5b69f646 Make update interval configurable 2024-04-16 22:08:52 +02:00
7104414bc1 Fix network issues in Docker 2024-04-16 08:11:01 +02:00
d580b0f1d0 Update console container 2024-04-16 08:10:46 +02:00
0312a1e37c Remove commented code 2024-04-16 08:10:27 +02:00
c98e8da2ec Format README 2024-04-16 08:09:41 +02:00
0016cd0f74 Update README 2024-04-14 21:55:02 +02:00
b098385e39 Set correct type 2024-04-14 21:30:51 +02:00
eae84fef1f Add more tests for KafkaUtils 2024-04-14 21:22:20 +02:00
29 changed files with 305 additions and 556 deletions

View File

@@ -1,4 +1,2 @@
elixir 1.16.2-otp-26
erlang 26.0.2
tilt 0.33.11
kind 0.22.0

12
LICENSE Normal file
View File

@@ -0,0 +1,12 @@
ISC License
Copyright 2024 Pascal Schmid
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
OF THIS SOFTWARE.

View File

@@ -1,14 +1,43 @@
# KafkaexLagExporter
# KafkaExLagExporter
This project will collect Kafka consumer lag and provide them via Prometheus.
## Metrics
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner.
KafkaExLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
**`kafka_consumergroup_group_topic_sum_lag`**
Labels: `cluster_name, group, topic, consumer_id, member_host`
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this
topic for this group.
**`kafka_consumergroup_group_lag`**
Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
The difference between the last produced offset and the last consumed offset for this partition in this topic
partition for this group.
## Start
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 lechindianer/kafkaex_lag_exporter:0.1
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
```
Now you can check the exposed metrics at [localhost:4000](localhost:4000).
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
## Configuration
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
value set `KAFKA_EX_INTERVAL_MS`, i.e.
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
```
## Developing
@@ -18,24 +47,38 @@ To start the project locally:
KAFKA_BROKERS="localhost:9092" iex -S mix
```
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
KafkaexLagExporter:
```bash
docker-compose up --build
docker compose up --build
```
Kowl is served at [localhost:8080](localhost:8080).
Kowl is served at [http://localhost:8080](http://localhost:8080).
### Tests
```bash
MIX_ENV=test mix test --no-start
```
# Don't forget to check credo for code violations:
### Code style
Don't forget to check [credo](https://hexdocs.pm/credo/overview.html) for code violations:
```bash
mix credo
```
This project also leverages the use of typespecs in order to provide static code checking:
```bash
mix dialyzer
```
## Links
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me
creating my first real Elixir project. Thank you!

View File

@@ -1,3 +0,0 @@
allow_k8s_contexts('default')
include('konvert/Tiltfile')

View File

@@ -1,8 +0,0 @@
kafka:
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# server:
# listenPort: 8080

View File

@@ -32,7 +32,7 @@ services:
- redpanda_network
console:
image: docker.redpanda.com/redpandadata/console:v2.2.4
image: docker.redpanda.com/redpandadata/console:v2.4.6
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
@@ -63,7 +63,6 @@ services:
image: quay.io/cloudhut/owl-shop:latest
networks:
- redpanda_network
#platform: 'linux/amd64'
environment:
- SHOP_KAFKA_BROKERS=redpanda:29092
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
@@ -78,7 +77,6 @@ services:
container_name: connect
networks:
- redpanda_network
#platform: 'linux/amd64'
depends_on:
- redpanda
ports:
@@ -109,6 +107,8 @@ services:
- '4000:4000'
environment:
- KAFKA_BROKERS=redpanda:29092
networks:
- redpanda_network
depends_on:
- redpanda
restart: "unless-stopped"

View File

@@ -1,15 +0,0 @@
#!/usr/bin/env bash
kind delete cluster
kind create cluster --config kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "address=/kind.cluster/$LB_IP"

View File

@@ -1,24 +0,0 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f

View File

@@ -1,21 +0,0 @@
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
k8s_yaml('./redpanda-deployment.yaml')
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
k8s_yaml('./redpanda-service.yaml')
k8s_yaml('./connect-service.yaml')
k8s_yaml('./connect-deployment.yaml')
k8s_yaml('./owl-shop-deployment.yaml')
k8s_yaml('./console-deployment.yaml')
k8s_yaml('./console-service.yaml')
k8s_yaml('./console-ingress.yaml')
docker_build('kafka-lag-exporter', './..')
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
k8s_yaml('./kafka-lag-exporter-service.yaml')
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: connect
name: connect
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: connect
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: connect
spec:
containers:
- env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: redpanda:29092
- name: CONNECT_CONFIGURATION
value: |
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
group.id=connectors-cluster
offset.storage.topic=_internal_connectors_offsets
config.storage.topic=_internal_connectors_configs
status.storage.topic=_internal_connectors_status
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.flush.interval.ms=1000
producer.linger.ms=50
producer.batch.size=131072
- name: CONNECT_GC_LOG_ENABLED
value: "false"
- name: CONNECT_HEAP_OPTS
value: -Xms512M -Xmx512M
- name: CONNECT_LOG_LEVEL
value: info
image: docker.redpanda.com/redpandadata/connectors:latest
name: connect
ports:
- containerPort: 8083
hostname: connect
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: connect
name: connect
spec:
ports:
- name: "8083"
port: 8083
targetPort: 8083
selector:
io.kompose.service: connect

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: console
name: console
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: console
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: console
spec:
containers:
- args:
- -c
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
command:
- /bin/sh
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
image: docker.redpanda.com/redpandadata/console:v2.2.4
name: console
ports:
- containerPort: 8080
restartPolicy: Always

View File

@@ -1,17 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-myserviceb
spec:
rules:
- host: console.lechindianer.hack
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: console
port:
number: 8080
ingressClassName: nginx

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: console
name: console
spec:
ports:
- name: "8080"
port: 8080
targetPort: 8080
selector:
io.kompose.service: console

View File

@@ -1,26 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka-lag-exporter
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-default: "true"
io.kompose.service: kafka-lag-exporter
spec:
containers:
- env:
- name: KAFKA_BROKERS
value: redpanda:29092
image: kafka-lag-exporter
name: kafka-lag-exporter
ports:
- containerPort: 4000
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
ports:
- name: "4000"
port: 4000
targetPort: 4000
selector:
io.kompose.service: kafka-lag-exporter

View File

@@ -1,31 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: owl-shop
name: owl-shop
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: owl-shop
template:
metadata:
creationTimestamp: null
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: owl-shop
spec:
containers:
- env:
- name: SHOP_KAFKA_BROKERS
value: redpanda:29092
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
value: "1"
- name: SHOP_TRAFFIC_INTERVAL_DURATION
value: 0.1s
- name: SHOP_TRAFFIC_INTERVAL_RATE
value: "6"
image: quay.io/cloudhut/owl-shop:latest
name: owl-shop
restartPolicy: Always

View File

@@ -1,44 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: redpanda
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: redpanda
spec:
containers:
- args:
- redpanda start
- --smp 1
- --overprovisioned
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
name: redpanda
ports:
- containerPort: 8081
- containerPort: 8082
- containerPort: 9092
- containerPort: 9644
- containerPort: 29092
volumeMounts:
- mountPath: /var/lib/redpanda/data
name: redpanda
restartPolicy: Always
volumes:
- name: redpanda
persistentVolumeClaim:
claimName: redpanda

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-default
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-redpanda-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"

View File

@@ -1,12 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi

View File

@@ -1,25 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
ports:
- name: "8081"
port: 8081
targetPort: 8081
- name: "8082"
port: 8082
targetPort: 8082
- name: "9092"
port: 9092
targetPort: 9092
- name: "9644"
port: 9644
targetPort: 9644
- name: "29092"
port: 29092
targetPort: 29092
selector:
io.kompose.service: redpanda

View File

@@ -3,16 +3,6 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
@callback connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
@callback resolve_offsets(binary, :earliest | :latest, atom) ::
list({non_neg_integer, integer})
@callback fetch_committed_offsets(binary, binary, atom) ::
list({non_neg_integer, non_neg_integer})
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
@callback lag_total(binary, binary, atom) :: non_neg_integer
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
@callback get_consumer_group_info(
@@ -21,23 +11,20 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
list(binary)
) :: list({consumer_group :: binary, topics :: list(binary)})
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
def connection(client), do: impl().connection(client)
def resolve_offsets(topic, type, client), do: impl().resolve_offsets(topic, type, client)
def fetch_committed_offsets(topic, consumer_group, client),
do: impl().fetch_committed_offsets(topic, consumer_group, client)
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
def lag_total(topic, consumer_group, client),
do: impl().lag_total(topic, consumer_group, client)
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
def get_consumer_group_names(endpoint), do: impl().get_consumer_group_names(endpoint)
def get_consumer_group_info(endpoint, list, consumer_group_names),
do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
def fetch_committed_offsets(topic, consumer_group, client),
do: impl().fetch_committed_offsets(topic, consumer_group, client)
defp impl,
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)
end

View File

@@ -4,10 +4,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
alias KafkaexLagExporter.ConsumerOffset
alias KafkaexLagExporter.KafkaUtils
# TODO fix type
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
lags: list(binary),
sum: list(binary)
lags: list(ConsumerOffset.t()),
sum: list(ConsumerOffset.t())
}
def get(endpoint) do
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)

View File

@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
interval =
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|> String.to_integer()
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Logger.info("Updating lag information every #{interval} milliseconds")
Process.send_after(self(), :tick, @interval)
Process.send_after(self(), :tick, interval)
{:ok, %{endpoints: endpoints}}
{:ok, %{endpoints: endpoints, interval: interval}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
interval = state.interval
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
Process.send_after(self(), :tick, interval)
{:noreply, state}
end

View File

@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
require Logger
@default_client :client1
def connection, do: connection(@default_client)
@@ -32,54 +30,8 @@ defmodule KafkaexLagExporter.KafkaUtils do
end
@impl true
def resolve_offsets(topic, type, client) do
{endpoints, sock_opts} = connection(client)
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} =
KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do
{i, offset}
end
end
@impl true
def fetch_committed_offsets(_topic, consumer_group, client) do
{endpoints, sock_opts} = connection(client)
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response,
pr <- r[:partitions],
do: {pr[:partition_index], pr[:committed_offset]}
end
@impl true
def lag(topic, consumer_group, client) do
offsets =
resolve_offsets(topic, :latest, client)
|> Enum.sort_by(fn {key, _value} -> key end)
committed_offsets =
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}
end
end
@impl true
def lag_total(topic, consumer_group, client) do
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
acc -> acc + recs
end
end
@impl true
def get_consumer_group_names({host, port}) do
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
def get_consumer_group_names(endpoint) do
[{_, groups} | _] = KafkaWrapper.list_all_groups([endpoint], [])
groups
|> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
@@ -92,25 +44,69 @@ defmodule KafkaexLagExporter.KafkaUtils do
group_descriptions
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
get_member_info(members)
|> Enum.map(fn {topic_names, consumer_id, member_host} ->
{consumer_group, topic_names, consumer_id, member_host}
end)
get_member_info(consumer_group, members)
end)
end
@impl true
def lag(topic, consumer_group, client) do
offsets =
resolve_offsets(topic, client)
|> Enum.sort_by(fn {key, _value} -> key end)
committed_offsets =
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}
end
end
@spec resolve_offsets(binary, atom) :: list({non_neg_integer, integer})
defp resolve_offsets(topic, client) do
{endpoints, sock_opts} = connection(client)
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{i, offset}
end
end
@spec fetch_committed_offsets(binary, binary, atom) ::
list({non_neg_integer, non_neg_integer})
defp fetch_committed_offsets(_topic, consumer_group, client) do
{endpoints, sock_opts} = connection(client)
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response, pr <- r[:partitions] do
{pr[:partition_index], pr[:committed_offset]}
end
end
@spec get_member_info(
binary,
list(%{client_host: binary, member_assignment: binary, member_id: binary})
) ::
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
defp get_member_info(members) do
Enum.map(members, fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topic_names, consumer_id, member_host}
list(
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
member_host :: binary}
)
defp get_member_info(consumer_group, members) do
members
|> Enum.map(fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topics, consumer_id, member_host}
end)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
end
end

View File

@@ -36,9 +36,13 @@ defmodule KafkaexLagExporter.Metrics do
)
end
@spec group_sum_lag(
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
list(ConsumerOffset.t())
) :: :ok
@doc false
def group_sum_lag({host, _port}, cunsumer_offsets) do
Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
def group_sum_lag({host, _port}, consumer_offsets) do
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
lag = elem(consumer_offset.lag, 1)
:telemetry.execute(
@@ -55,6 +59,10 @@ defmodule KafkaexLagExporter.Metrics do
end)
end
@spec group_lag_per_partition(
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
list(ConsumerOffset.t())
) :: :ok
@doc false
def group_lag_per_partition({host, _port}, consumer_offsets) do
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->

View File

@@ -40,9 +40,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
end
test "should return the calculated lags" do
test_endpoint = {"test endpoint", 666}
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
assert sum == [
%ConsumerOffset{

View File

@@ -5,9 +5,11 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
alias KafkaexLagExporter.KafkaUtils
@test_endpoint "test_host:1234"
@test_consumer_group "test-consumer-group"
@test_endpoint {"test_host", "1234"}
@test_sock_opts [ssl: []]
@test_group_name1 "test-consumer_group1"
@test_group_name2 "test-consumer_group"
@test_topic_name "test-topic_name"
setup do
patch(
@@ -19,86 +21,175 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
:ok
end
describe "resolve_offsets" do
describe "get_consumer_group_names" do
setup do
client = :client2
topic = "test-topic"
consumer_info1 = {:a, @test_group_name1, "consumer"}
consumer_info2 = {:b, @test_group_name2, "something-other"}
patch(
KafkaWrapper,
:get_partitions_count,
fn _, _ -> {:ok, 3} end
)
patch(KafkaWrapper, :list_all_groups, fn _, _ ->
[{@test_endpoint, [consumer_info1, consumer_info2]}]
end)
patch(
KafkaWrapper,
:resolve_offset,
fn _, _, _, _, _ -> {:ok, 42} end
)
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
offsets = KafkaUtils.resolve_offsets(topic, :earliest, client)
[client: client, offsets: offsets, topic: topic]
[group_names: group_names]
end
test "should get partition count", context do
expected_topic = context[:topic]
expected_client = context[:client]
assert_called(KafkaWrapper.get_partitions_count(expected_client, expected_topic))
test "should list all groups" do
assert_called(KafkaWrapper.list_all_groups([@test_endpoint], []))
end
test "should resolve offset per partition", context do
expected_topic = context[:topic]
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 0, :earliest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 1, :earliest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 2, :earliest, @test_sock_opts)
)
end
test "should return offsets", context do
assert context[:offsets] == [{0, 42}, {1, 42}, {2, 42}]
test "should filter for consumer groups and return names", context do
assert context[:group_names] === [@test_group_name1]
end
end
describe "fetch_committed_offsets" do
describe "get_consumer_group_info" do
setup do
partition_info1 = %{partition_index: 0, committed_offset: 23}
partition_info2 = %{partition_index: 1, committed_offset: 42}
consumer_group_name = "test-group-id1"
member_assignment = "test_member_assignment"
member_assignment = "test_member_assignment"
consumer_id = "test_consumer_id"
member_host = "test_member_host"
patch(
KafkaWrapper,
:fetch_committed_offsets,
fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end
)
patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
{
:ok,
[
%{
group_id: consumer_group_name,
members: [
%{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
}
]
}
]
}
end)
offsets = KafkaUtils.fetch_committed_offsets("test-topic", @test_consumer_group, :test_atom)
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
[offsets: offsets]
group_info =
KafkaUtils.get_consumer_group_info(@test_endpoint, [], [
@test_group_name1,
@test_group_name2
])
[
member_host: member_host,
group_info: group_info,
consumer_group_name: consumer_group_name,
member_assignment: member_assignment,
consumer_id: consumer_id
]
end
test "should get the committed offsets from KafkaWrapper" do
test "should describe groups" do
assert_called(
KafkaWrapper.describe_groups(@test_endpoint, [], [@test_group_name1, @test_group_name2])
)
end
test "should parse topic names", context do
expected_member_assignment = context[:expected_member_assignment]
assert_called(
KafkaexLagExporter.TopicNameParser.parse_topic_names(expected_member_assignment)
)
end
test "should filter for consumer groups and return names", context do
expected_consumer_group_name = context[:consumer_group_name]
expected_consumer_id = context[:consumer_id]
expected_member_host = context[:member_host]
assert context[:group_info] === [
{
expected_consumer_group_name,
[@test_topic_name],
expected_consumer_id,
expected_member_host
}
]
end
end
describe "lag" do
setup do
consumer_group = "test-consumer-group"
client = :client1
committed_offset1 = 23
committed_offset2 = 42
resolved_offset = 50
partition_info1 = %{partition_index: 0, committed_offset: committed_offset1}
partition_info2 = %{partition_index: 1, committed_offset: committed_offset2}
patch(KafkaWrapper, :get_partitions_count, fn _, _ -> {:ok, 2} end)
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end)
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)
[
lag: lag,
consumer_group: consumer_group,
client: client,
committed_offset1: committed_offset1,
committed_offset2: committed_offset2,
resolved_offset: resolved_offset
]
end
test "should start connection", context do
expected_client = context[:client]
assert_called(KafkaUtils.connection(expected_client))
end
test "should get partition count", context do
expected_client = context[:client]
assert_called(KafkaWrapper.get_partitions_count(expected_client, @test_topic_name))
end
test "should resolve offsets for each partition" do
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 0, :latest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 1, :latest, @test_sock_opts)
)
end
test "should fetch committed offsets", context do
expected_consumer_group = context[:consumer_group]
assert_called(
KafkaWrapper.fetch_committed_offsets(
@test_endpoint,
@test_sock_opts,
@test_consumer_group
expected_consumer_group
)
)
end
test "should return offsets", context do
assert context[:offsets] == [{0, 23}, {1, 42}]
test "should return calculated lag per partition", context do
expected_lag_partition_1 = context[:resolved_offset] - context[:committed_offset1]
expected_lag_partition_2 = context[:resolved_offset] - context[:committed_offset2]
assert context[:lag] === [
{0, expected_lag_partition_1},
{1, expected_lag_partition_2}
]
end
end
end