Compare commits

..

26 Commits

Author SHA1 Message Date
29c032cb16 Add license 2024-06-01 12:02:18 +02:00
a34e91770f Simplify test code 2024-04-16 22:33:28 +02:00
98e79b927d Update docs to include configurable interval 2024-04-16 22:21:46 +02:00
9e9676d4e4 Remove code for Tilt cluster 2024-04-16 22:16:44 +02:00
bf8389b4e0 Simplify code 2024-04-16 22:11:50 +02:00
ee5b69f646 Make update interval configurable 2024-04-16 22:08:52 +02:00
7104414bc1 Fix network issues in Docker 2024-04-16 08:11:01 +02:00
d580b0f1d0 Update console container 2024-04-16 08:10:46 +02:00
0312a1e37c Remove commented code 2024-04-16 08:10:27 +02:00
c98e8da2ec Format README 2024-04-16 08:09:41 +02:00
0016cd0f74 Update README 2024-04-14 21:55:02 +02:00
b098385e39 Set correct type 2024-04-14 21:30:51 +02:00
eae84fef1f Add more tests for KafkaUtils 2024-04-14 21:22:20 +02:00
baed79e1ba Add more tags to metrics 2024-03-28 23:49:44 +01:00
a68a0126c8 Update test for KafkaUtils 2024-03-28 23:46:26 +01:00
1630d1dcda Remove empty module 2024-03-28 23:45:56 +01:00
4922da165c Return more data for members 2024-03-28 23:45:40 +01:00
9fbf7a98b8 Fix init cluster script 2024-03-28 23:44:25 +01:00
08b5923d52 Simplify code 2024-03-28 21:23:51 +01:00
6bf68b74ec Install test watcher 2024-03-28 19:05:59 +01:00
1cc9afbcf4 Adjust async level of tests 2024-03-28 18:41:35 +01:00
f09ad58fb5 Fix type mismatch 2024-03-27 22:26:02 +01:00
3d835480d1 Use behaviour for KafkaUtils 2024-03-27 22:25:29 +01:00
ada8f12309 Remove implemented doc comment 2024-03-27 22:24:06 +01:00
ac5280acfd Add type spec 2024-03-27 22:23:12 +01:00
7b40cbedd1 Replace Mox with Patch 2024-03-27 22:19:32 +01:00
42 changed files with 607 additions and 719 deletions

View File

@@ -1,4 +1,2 @@
elixir 1.16.2-otp-26
erlang 26.0.2
tilt 0.33.11
kind 0.22.0

12
LICENSE Normal file
View File

@@ -0,0 +1,12 @@
ISC License
Copyright 2024 Pascal Schmid
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
OF THIS SOFTWARE.

View File

@@ -1,14 +1,43 @@
# KafkaexLagExporter
# KafkaExLagExporter
This project will collect Kafka consumer lag and provide them via Prometheus.
## Metrics
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner.
KafkaExLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
**`kafka_consumergroup_group_topic_sum_lag`**
Labels: `cluster_name, group, topic, consumer_id, member_host`
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this
topic for this group.
**`kafka_consumergroup_group_lag`**
Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
The difference between the last produced offset and the last consumed offset for this partition in this topic
partition for this group.
## Start
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 lechindianer/kafkaex_lag_exporter:0.1
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
```
Now you can check the exposed metrics at [localhost:4000](localhost:4000).
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
## Configuration
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
value set `KAFKA_EX_INTERVAL_MS`, i.e.
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
```
## Developing
@@ -18,24 +47,38 @@ To start the project locally:
KAFKA_BROKERS="localhost:9092" iex -S mix
```
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
KafkaexLagExporter:
```bash
docker-compose up --build
```bash
docker compose up --build
```
Kowl is served at [localhost:8080](localhost:8080).
Kowl is served at [http://localhost:8080](http://localhost:8080).
### Tests
```bash
MIX_ENV=test mix test --no-start
MIX_ENV=test mix test --no-start
```
# Don't forget to check credo for code violations:
### Code style
Don't forget to check [credo](https://hexdocs.pm/credo/overview.html) for code violations:
```bash
mix credo
```
This project also leverages the use of typespecs in order to provide static code checking:
```bash
mix dialyzer
```
## Links
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me
creating my first real Elixir project. Thank you!

View File

@@ -1,3 +0,0 @@
allow_k8s_contexts('default')
include('konvert/Tiltfile')

View File

@@ -1,8 +0,0 @@
kafka:
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# server:
# listenPort: 8080

View File

@@ -32,7 +32,7 @@ services:
- redpanda_network
console:
image: docker.redpanda.com/redpandadata/console:v2.2.4
image: docker.redpanda.com/redpandadata/console:v2.4.6
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
@@ -63,7 +63,6 @@ services:
image: quay.io/cloudhut/owl-shop:latest
networks:
- redpanda_network
#platform: 'linux/amd64'
environment:
- SHOP_KAFKA_BROKERS=redpanda:29092
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
@@ -78,7 +77,6 @@ services:
container_name: connect
networks:
- redpanda_network
#platform: 'linux/amd64'
depends_on:
- redpanda
ports:
@@ -109,6 +107,8 @@ services:
- '4000:4000'
environment:
- KAFKA_BROKERS=redpanda:29092
networks:
- redpanda_network
depends_on:
- redpanda
restart: "unless-stopped"

View File

@@ -1,15 +0,0 @@
#!/usr/bin/env bash
kind delete cluster
kind create cluster --config deployment/kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "address=/kind.cluster/$LB_IP"

View File

@@ -1,21 +0,0 @@
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
k8s_yaml('./redpanda-deployment.yaml')
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
k8s_yaml('./redpanda-service.yaml')
k8s_yaml('./connect-service.yaml')
k8s_yaml('./connect-deployment.yaml')
k8s_yaml('./owl-shop-deployment.yaml')
k8s_yaml('./console-deployment.yaml')
k8s_yaml('./console-service.yaml')
k8s_yaml('./console-ingress.yaml')
docker_build('kafka-lag-exporter', './..')
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
k8s_yaml('./kafka-lag-exporter-service.yaml')
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: connect
name: connect
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: connect
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: connect
spec:
containers:
- env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: redpanda:29092
- name: CONNECT_CONFIGURATION
value: |
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
group.id=connectors-cluster
offset.storage.topic=_internal_connectors_offsets
config.storage.topic=_internal_connectors_configs
status.storage.topic=_internal_connectors_status
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.flush.interval.ms=1000
producer.linger.ms=50
producer.batch.size=131072
- name: CONNECT_GC_LOG_ENABLED
value: "false"
- name: CONNECT_HEAP_OPTS
value: -Xms512M -Xmx512M
- name: CONNECT_LOG_LEVEL
value: info
image: docker.redpanda.com/redpandadata/connectors:latest
name: connect
ports:
- containerPort: 8083
hostname: connect
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: connect
name: connect
spec:
ports:
- name: "8083"
port: 8083
targetPort: 8083
selector:
io.kompose.service: connect

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: console
name: console
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: console
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: console
spec:
containers:
- args:
- -c
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
command:
- /bin/sh
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
image: docker.redpanda.com/redpandadata/console:v2.2.4
name: console
ports:
- containerPort: 8080
restartPolicy: Always

View File

@@ -1,17 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-myserviceb
spec:
rules:
- host: console.lechindianer.hack
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: console
port:
number: 8080
ingressClassName: nginx

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: console
name: console
spec:
ports:
- name: "8080"
port: 8080
targetPort: 8080
selector:
io.kompose.service: console

View File

@@ -1,26 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka-lag-exporter
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-default: "true"
io.kompose.service: kafka-lag-exporter
spec:
containers:
- env:
- name: KAFKA_BROKERS
value: redpanda:29092
image: kafka-lag-exporter
name: kafka-lag-exporter
ports:
- containerPort: 4000
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
ports:
- name: "4000"
port: 4000
targetPort: 4000
selector:
io.kompose.service: kafka-lag-exporter

View File

@@ -1,31 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: owl-shop
name: owl-shop
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: owl-shop
template:
metadata:
creationTimestamp: null
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: owl-shop
spec:
containers:
- env:
- name: SHOP_KAFKA_BROKERS
value: redpanda:29092
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
value: "1"
- name: SHOP_TRAFFIC_INTERVAL_DURATION
value: 0.1s
- name: SHOP_TRAFFIC_INTERVAL_RATE
value: "6"
image: quay.io/cloudhut/owl-shop:latest
name: owl-shop
restartPolicy: Always

View File

@@ -1,44 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: redpanda
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: redpanda
spec:
containers:
- args:
- redpanda start
- --smp 1
- --overprovisioned
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
name: redpanda
ports:
- containerPort: 8081
- containerPort: 8082
- containerPort: 9092
- containerPort: 9644
- containerPort: 29092
volumeMounts:
- mountPath: /var/lib/redpanda/data
name: redpanda
restartPolicy: Always
volumes:
- name: redpanda
persistentVolumeClaim:
claimName: redpanda

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-default
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-redpanda-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"

View File

@@ -1,12 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi

View File

@@ -1,25 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
ports:
- name: "8081"
port: 8081
targetPort: 8081
- name: "8082"
port: 8082
targetPort: 8082
- name: "9092"
port: 9092
targetPort: 9092
- name: "9644"
port: 9644
targetPort: 9644
- name: "29092"
port: 29092
targetPort: 29092
selector:
io.kompose.service: redpanda

View File

@@ -1,13 +0,0 @@
defmodule KafkaexLagExporter do
@moduledoc """
KafkaexLagExporter keeps the contexts that define your domain
and business logic.
Contexts are also responsible for managing your data, regardless
if it comes from the database, an external API or others.
"""
def hello() do
:world
end
end

View File

@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.Application do
{Phoenix.PubSub, name: KafkaexLagExporter.PubSub},
# Start the Endpoint (http/https)
KafkaexLagExporterWeb.Endpoint,
KafkaexLagExporter.ConsumerOffset
KafkaexLagExporter.ConsumerOffsetRunner
]
# See https://hexdocs.pm/elixir/Supervisor.html

View File

@@ -3,41 +3,28 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
@callback connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
@callback resolve_offsets(binary, :earliest | :latest, atom) ::
list({non_neg_integer, integer})
@callback fetch_committed_offsets(binary, binary, atom) ::
list({non_neg_integer, non_neg_integer})
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
@callback lag_total(binary, binary, atom) :: non_neg_integer
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
@callback topic_names_for_consumer_groups(
@callback get_consumer_group_info(
{host :: atom, port :: non_neg_integer},
list(binary),
list(binary)
) :: list(binary)
) :: list({consumer_group :: binary, topics :: list(binary)})
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
def connection(client), do: impl().connection(client)
def resolve_offsets(topic, type, client), do: impl().resolve_offsets(topic, type, client)
def get_consumer_group_names(endpoint), do: impl().get_consumer_group_names(endpoint)
def get_consumer_group_info(endpoint, list, consumer_group_names),
do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
def fetch_committed_offsets(topic, consumer_group, client),
do: impl().fetch_committed_offsets(topic, consumer_group, client)
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
def lag_total(topic, consumer_group, client),
do: impl().lag_total(topic, consumer_group, client)
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
def topic_names_for_consumer_groups(endpoint, list, consumer_group_names),
do: impl().topic_names_for_consumer_groups(endpoint, list, consumer_group_names)
defp impl,
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)
end

View File

@@ -1,41 +1,17 @@
defmodule KafkaexLagExporter.ConsumerOffset do
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
@moduledoc "Struct holding all relevant telemetry information of consumers"
use GenServer
@type t :: %__MODULE__{
consumer_group: binary,
topic: binary,
lag: list({partition :: non_neg_integer, lag :: non_neg_integer}),
consumer_id: binary,
member_host: binary
}
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Starting #{__MODULE__}")
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Process.send_after(self(), :tick, @interval)
{:ok, %{endpoints: endpoints}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
{consumer_lags, consumer_lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, consumer_lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, consumer_lag_sum)
Process.send_after(self(), :tick, @interval)
{:noreply, state}
end
defstruct consumer_group: "",
topic: "",
lag: [],
consumer_id: "",
member_host: ""
end

View File

@@ -1,38 +1,49 @@
defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
@moduledoc "Calculate summarized lag for each consumer group"
require Logger
alias KafkaexLagExporter.ConsumerOffset
alias KafkaexLagExporter.KafkaUtils
# TODO: change return type
@spec get(KafkaexLagExporter.KafkaWrapper.endpoint()) :: {any(), any()}
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
lags: list(ConsumerOffset.t()),
sum: list(ConsumerOffset.t())
}
def get(endpoint) do
consumer_group_names = KafkaexLagExporter.KafkaUtils.get_consumer_group_names(endpoint)
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
consumer_lags =
KafkaexLagExporter.KafkaUtils.topic_names_for_consumer_groups(
endpoint,
[],
consumer_group_names
)
|> Enum.map(fn [consumer_group, topics] ->
[consumer_group, get_lag_for_consumer(consumer_group, topics)]
end)
KafkaUtils.get_consumer_group_info(endpoint, [], consumer_group_names)
|> Enum.flat_map(&get_lag_per_topic(&1))
consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags)
%{lags: consumer_lags, sum: consumer_lag_sum}
end
defp get_lag_for_consumer(consumer_group, topics) do
topics
|> Enum.flat_map(fn topic ->
KafkaexLagExporter.KafkaUtils.lag(topic, consumer_group, :client1)
@spec get_lag_per_topic(
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
member_host :: binary}
) :: list(ConsumerOffset.t())
defp get_lag_per_topic({consumer_group, topics, consumer_id, member_host}) do
Enum.map(topics, fn topic ->
lag = KafkaUtils.lag(topic, consumer_group, :client1)
%ConsumerOffset{
consumer_group: consumer_group,
topic: topic,
lag: lag,
consumer_id: consumer_id,
member_host: member_host
}
end)
end
@spec get_lag_per_topic(list(ConsumerOffset.t())) :: list(ConsumerOffset.t())
defp get_lag_for_consumer_sum(lags_per_consumer_group) do
lags_per_consumer_group
|> Enum.map(fn [topic, lag_per_partition] -> [topic, sum_topic_lag(lag_per_partition, 0)] end)
Enum.map(lags_per_consumer_group, fn consumer_offset ->
lag_sum = sum_topic_lag(consumer_offset.lag, 0)
%ConsumerOffset{consumer_offset | lag: {0, lag_sum}}
end)
end
defp sum_topic_lag([], acc), do: acc

View File

@@ -0,0 +1,45 @@
defmodule KafkaexLagExporter.ConsumerOffsetRunner do
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
use GenServer
require Logger
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@impl true
def init(_) do
Logger.info("Starting #{__MODULE__}")
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
interval =
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|> String.to_integer()
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Logger.info("Updating lag information every #{interval} milliseconds")
Process.send_after(self(), :tick, interval)
{:ok, %{endpoints: endpoints, interval: interval}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
interval = state.interval
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, interval)
{:noreply, state}
end
end

View File

@@ -0,0 +1,112 @@
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
defmodule KafkaexLagExporter.KafkaUtils do
@behaviour KafkaexLagExporter.KafkaUtils.Behaviour
@moduledoc "Utility functions for dealing with Kafka"
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
@default_client :client1
def connection, do: connection(@default_client)
@impl true
def connection(client) do
clients = Application.get_env(:brod, :clients)
config = clients[client]
endpoints = config[:endpoints] || [{~c"localhost", 9092}]
sock_opts =
case Keyword.fetch(config, :ssl) do
{:ok, ssl_opts} ->
[ssl: ssl_opts]
:error ->
[]
end
{endpoints, sock_opts}
end
@impl true
def get_consumer_group_names(endpoint) do
[{_, groups} | _] = KafkaWrapper.list_all_groups([endpoint], [])
groups
|> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
|> Enum.map(fn {_, group_name, _} -> group_name end)
end
@impl true
def get_consumer_group_info(endpoint, list \\ [], consumer_group_names) do
{:ok, group_descriptions} = KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
group_descriptions
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
get_member_info(consumer_group, members)
end)
end
@impl true
def lag(topic, consumer_group, client) do
offsets =
resolve_offsets(topic, client)
|> Enum.sort_by(fn {key, _value} -> key end)
committed_offsets =
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}
end
end
@spec resolve_offsets(binary, atom) :: list({non_neg_integer, integer})
defp resolve_offsets(topic, client) do
{endpoints, sock_opts} = connection(client)
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{i, offset}
end
end
@spec fetch_committed_offsets(binary, binary, atom) ::
list({non_neg_integer, non_neg_integer})
defp fetch_committed_offsets(_topic, consumer_group, client) do
{endpoints, sock_opts} = connection(client)
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response, pr <- r[:partitions] do
{pr[:partition_index], pr[:committed_offset]}
end
end
@spec get_member_info(
binary,
list(%{client_host: binary, member_assignment: binary, member_id: binary})
) ::
list(
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
member_host :: binary}
)
defp get_member_info(consumer_group, members) do
members
|> Enum.map(fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topics, consumer_id, member_host}
end)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
end
end

View File

@@ -3,6 +3,8 @@ defmodule KafkaexLagExporter.Metrics do
use PromEx.Plugin
alias KafkaexLagExporter.ConsumerOffset
require Logger
@kafka_event :kafka
@@ -21,52 +23,60 @@ defmodule KafkaexLagExporter.Metrics do
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
description: "Sum of group offset lag across topic partitions",
measurement: :lag,
# TODO: add more tags like member_host, consumer_id, client_id, ...
tags: [:cluster_name, :group, :topic]
tags: [:cluster_name, :group, :topic, :consumer_id, :member_host]
),
last_value(
[@kafka_event, :consumergroup, :group, :lag],
event_name: [@kafka_event, :consumergroup, :group, :lag],
description: "Group offset lag of a partition",
measurement: :lag,
# TODO: add more tags like member_host, consumer_id, client_id, ...
tags: [:cluster_name, :group, :partition, :topic]
tags: [:cluster_name, :group, :partition, :topic, :consumer_id, :member_host]
)
]
)
end
@spec group_sum_lag(
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
list(ConsumerOffset.t())
) :: :ok
@doc false
def group_sum_lag({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn [group_name, lag] ->
def group_sum_lag({host, _port}, consumer_offsets) do
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
lag = elem(consumer_offset.lag, 1)
:telemetry.execute(
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
%{
lag: lag
},
%{lag: lag},
%{
cluster_name: host,
group: group_name,
topic: []
group: consumer_offset.consumer_group,
topic: consumer_offset.topic,
consumer_id: consumer_offset.consumer_id,
member_host: consumer_offset.member_host
}
)
end)
end
@spec group_lag_per_partition(
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
list(ConsumerOffset.t())
) :: :ok
@doc false
def group_lag_per_partition({host, _port}, consumer_lags) do
Enum.each(consumer_lags, fn [group_name, lags] ->
Enum.each(lags, fn {partition, lag} ->
def group_lag_per_partition({host, _port}, consumer_offsets) do
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
Enum.each(consumer_offset.lag, fn {partition, lag} ->
:telemetry.execute(
[@kafka_event, :consumergroup, :group, :lag],
%{
lag: lag
},
%{lag: lag},
%{
cluster_name: host,
group: group_name,
group: consumer_offset.consumer_group,
partition: partition,
topic: []
topic: consumer_offset.topic,
consumer_id: consumer_offset.consumer_id,
member_host: consumer_offset.member_host
}
)
end)

View File

@@ -1,56 +1,5 @@
defmodule KafkaexLagExporter.PromEx do
@moduledoc """
Be sure to add the following to finish setting up PromEx:
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
more details regarding configuring PromEx:
```
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
disabled: false,
manual_metrics_start_delay: :no_delay,
drop_metrics_groups: [],
grafana: :disabled,
metrics_server: :disabled
```
2. Add this module to your application supervision tree. It should be one of the first
things that is started so that no Telemetry events are missed. For example, if PromEx
is started after your Repo module, you will miss Ecto's init events and the dashboards
will be missing some data points:
```
def start(_type, _args) do
children = [
KafkaexLagExporter.PromEx,
...
]
...
end
```
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
server using the `:metrics_server` config options). Be sure to put this plug before
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
endpoint create their own metrics and logs which can pollute your logs/metrics given
that Prometheus will scrape at a regular interval and that can get noisy:
```
defmodule KafkaexLagExporterWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
...
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
...
end
```
4. Update the list of plugins in the `plugins/0` function return list to reflect your
application's dependencies. Also update the list of dashboards that are to be uploaded
to Grafana in the `dashboards/0` function.
"""
@moduledoc false
use PromEx, otp_app: :kafkaex_lag_exporter

View File

@@ -3,6 +3,7 @@ defmodule KafkaexLagExporter.TopicNameParser do
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/
@spec parse_topic_names(binary) :: list(binary)
def parse_topic_names(member_assignment) do
member_assignment
|> String.chunk(:printable)

View File

@@ -1,75 +0,0 @@
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
defmodule KafkaexLagExporter.KafkaUtils do
@moduledoc "Utility functions for dealing with Kafka"
@default_client :client1
@type endpoint() :: {host :: atom(), port :: non_neg_integer()}
def connection, do: connection(@default_client)
@spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
def connection(client) do
clients = Application.get_env(:brod, :clients)
config = clients[client]
endpoints = config[:endpoints] || [{~c"localhost", 9092}]
sock_opts =
case Keyword.fetch(config, :ssl) do
{:ok, ssl_opts} ->
[ssl: ssl_opts]
:error ->
[]
end
{endpoints, sock_opts}
end
@spec resolve_offsets(binary(), :earliest | :latest, atom()) ::
list({non_neg_integer(), integer()})
def resolve_offsets(topic, type, client) do
{endpoints, sock_opts} = connection(client)
{:ok, partitions_count} = :brod.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do
{i, offset}
end
end
@spec fetch_committed_offsets(binary(), binary(), atom()) ::
{non_neg_integer(), non_neg_integer()}
def fetch_committed_offsets(_topic, consumer_group, client) do
{endpoints, sock_opts} = connection(client)
{:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response,
pr <- r[:partitions],
do: {pr[:partition_index], pr[:committed_offset]}
end
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()})
def lag(topic, consumer_group, client) do
offsets =
resolve_offsets(topic, :latest, client)
|> Enum.sort_by(fn {key, _value} -> key end)
committed_offsets =
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}
end
end
@spec lag_total(binary(), binary(), atom()) :: non_neg_integer()
def lag_total(topic, consumer_group, client) do
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
acc -> acc + recs
end
end
end

View File

@@ -37,12 +37,13 @@ defmodule KafkaexLagExporter.MixProject do
{:telemetry_poller, "~> 1.0"},
{:jason, "~> 1.4.0"},
{:plug_cowboy, "~> 2.6.1"},
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:mox, "~> 1.1", only: :test},
{:brod, "~> 3.17.0"},
{:prom_ex, "~> 1.8.0"},
{:telemetry, "~> 1.2"}
{:telemetry, "~> 1.2"},
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:patch, "~> 0.13.0", only: :test},
{:mix_test_watch, "~> 1.2.0", only: [:dev, :test], runtime: false}
]
end
end

View File

@@ -16,10 +16,11 @@
"kafka_protocol": {:hex, :kafka_protocol, "4.1.5", "d15e64994a8ca99716ab47db4132614359ac1bfa56d6c5b4341fdc1aa4041518", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "c956c9357fef493b7072a35d0c3e2be02aa5186c804a412d29e62423bb15e5d9"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
"patch": {:hex, :patch, "0.13.0", "da48728f9086a835956200a671210fe88f67ff48bb1f92626989886493ac2081", [:mix], [], "hexpm", "d65a840d485dfa05bf6673269b56680e7537a05050684e713de125a351b28112"},
"phoenix": {:hex, :phoenix, "1.6.16", "e5bdd18c7a06da5852a25c7befb72246de4ddc289182285f8685a40b7b5f5451", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 1.0 or ~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e15989ff34f670a96b95ef6d1d25bad0d9c50df5df40b671d8f4a669e050ac39"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"},
"phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"},

View File

@@ -0,0 +1,96 @@
defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
use ExUnit.Case
use Patch
alias KafkaexLagExporter.ConsumerOffset
@test_consumer_group_name1 "test_consumer_1"
@test_consumer_group_name2 "test_consumer_2"
@test_lags1 [{0, 23}, {1, 42}, {2, 666}]
@test_lags2 [{0, 1}, {1, 2}, {2, 3}]
@test_topic1 "test_topic_1"
@test_topic2 "test_topic_2"
@test_topic3 "test_topic_3"
@test_consumer_id1 "test_consumer_id1"
@test_consumer_id2 "test_consumer_id2"
@test_member_host "127.0.0.1"
setup do
patch(
KafkaexLagExporter.KafkaUtils,
:get_consumer_group_names,
fn _ -> [@test_consumer_group_name1, @test_consumer_group_name2] end
)
patch(KafkaexLagExporter.KafkaUtils, :lag, &lag(&1, &2, &3))
patch(
KafkaexLagExporter.KafkaUtils,
:get_consumer_group_info,
fn _, _, _ ->
[
{@test_consumer_group_name1, [@test_topic1, @test_topic2], @test_consumer_id1,
@test_member_host},
{@test_consumer_group_name2, [@test_topic3], @test_consumer_id2, @test_member_host}
]
end
)
:ok
end
test "should return the calculated lags" do
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
assert sum == [
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic1,
lag: {0, 731},
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic2,
lag: {0, 6},
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name2,
topic: @test_topic3,
lag: {0, 6},
consumer_id: @test_consumer_id2,
member_host: @test_member_host
}
]
assert lags == [
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic1,
lag: @test_lags1,
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name1,
topic: @test_topic2,
lag: @test_lags2,
consumer_id: @test_consumer_id1,
member_host: @test_member_host
},
%ConsumerOffset{
consumer_group: @test_consumer_group_name2,
topic: @test_topic3,
lag: @test_lags2,
consumer_id: @test_consumer_id2,
member_host: @test_member_host
}
]
end
defp lag(@test_topic1, _, _), do: @test_lags1
defp lag(_, _, _), do: @test_lags2
end

195
test/kafka_utils_test.exs Normal file
View File

@@ -0,0 +1,195 @@
defmodule KafkaexLagExporter.KafkaUtils.Test do
use ExUnit.Case
use Patch
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
alias KafkaexLagExporter.KafkaUtils
@test_endpoint {"test_host", "1234"}
@test_sock_opts [ssl: []]
@test_group_name1 "test-consumer_group1"
@test_group_name2 "test-consumer_group"
@test_topic_name "test-topic_name"
setup do
patch(
KafkaUtils,
:connection,
fn _ -> {@test_endpoint, @test_sock_opts} end
)
:ok
end
describe "get_consumer_group_names" do
setup do
consumer_info1 = {:a, @test_group_name1, "consumer"}
consumer_info2 = {:b, @test_group_name2, "something-other"}
patch(KafkaWrapper, :list_all_groups, fn _, _ ->
[{@test_endpoint, [consumer_info1, consumer_info2]}]
end)
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
[group_names: group_names]
end
test "should list all groups" do
assert_called(KafkaWrapper.list_all_groups([@test_endpoint], []))
end
test "should filter for consumer groups and return names", context do
assert context[:group_names] === [@test_group_name1]
end
end
describe "get_consumer_group_info" do
setup do
consumer_group_name = "test-group-id1"
member_assignment = "test_member_assignment"
member_assignment = "test_member_assignment"
consumer_id = "test_consumer_id"
member_host = "test_member_host"
patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
{
:ok,
[
%{
group_id: consumer_group_name,
members: [
%{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
}
]
}
]
}
end)
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
group_info =
KafkaUtils.get_consumer_group_info(@test_endpoint, [], [
@test_group_name1,
@test_group_name2
])
[
member_host: member_host,
group_info: group_info,
consumer_group_name: consumer_group_name,
member_assignment: member_assignment,
consumer_id: consumer_id
]
end
test "should describe groups" do
assert_called(
KafkaWrapper.describe_groups(@test_endpoint, [], [@test_group_name1, @test_group_name2])
)
end
test "should parse topic names", context do
expected_member_assignment = context[:expected_member_assignment]
assert_called(
KafkaexLagExporter.TopicNameParser.parse_topic_names(expected_member_assignment)
)
end
test "should filter for consumer groups and return names", context do
expected_consumer_group_name = context[:consumer_group_name]
expected_consumer_id = context[:consumer_id]
expected_member_host = context[:member_host]
assert context[:group_info] === [
{
expected_consumer_group_name,
[@test_topic_name],
expected_consumer_id,
expected_member_host
}
]
end
end
describe "lag" do
setup do
consumer_group = "test-consumer-group"
client = :client1
committed_offset1 = 23
committed_offset2 = 42
resolved_offset = 50
partition_info1 = %{partition_index: 0, committed_offset: committed_offset1}
partition_info2 = %{partition_index: 1, committed_offset: committed_offset2}
patch(KafkaWrapper, :get_partitions_count, fn _, _ -> {:ok, 2} end)
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end)
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)
[
lag: lag,
consumer_group: consumer_group,
client: client,
committed_offset1: committed_offset1,
committed_offset2: committed_offset2,
resolved_offset: resolved_offset
]
end
test "should start connection", context do
expected_client = context[:client]
assert_called(KafkaUtils.connection(expected_client))
end
test "should get partition count", context do
expected_client = context[:client]
assert_called(KafkaWrapper.get_partitions_count(expected_client, @test_topic_name))
end
test "should resolve offsets for each partition" do
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 0, :latest, @test_sock_opts)
)
assert_called(
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 1, :latest, @test_sock_opts)
)
end
test "should fetch committed offsets", context do
expected_consumer_group = context[:consumer_group]
assert_called(
KafkaWrapper.fetch_committed_offsets(
@test_endpoint,
@test_sock_opts,
expected_consumer_group
)
)
end
test "should return calculated lag per partition", context do
expected_lag_partition_1 = context[:resolved_offset] - context[:committed_offset1]
expected_lag_partition_2 = context[:resolved_offset] - context[:committed_offset2]
assert context[:lag] === [
{0, expected_lag_partition_1},
{1, expected_lag_partition_2}
]
end
end
end

View File

@@ -1,8 +0,0 @@
defmodule KafkaexLagExporterTest do
use ExUnit.Case
doctest KafkaexLagExporter
test "greets the world" do
assert KafkaexLagExporter.hello() == :world
end
end

View File

@@ -1,17 +0,0 @@
defmodule KafkaexLagExporterWeb.ErrorViewTest do
use KafkaexLagExporterWeb.ConnCase, async: true
# Bring render/3 and render_to_string/3 for testing custom views
import Phoenix.View
test "renders 404.json" do
assert render(KafkaexLagExporterWeb.ErrorView, "404.json", []) == %{
errors: %{detail: "Not Found"}
}
end
test "renders 500.json" do
assert render(KafkaexLagExporterWeb.ErrorView, "500.json", []) ==
%{errors: %{detail: "Internal Server Error"}}
end
end

View File

@@ -1,34 +0,0 @@
defmodule KafkaexLagExporterWeb.ChannelCase do
@moduledoc """
This module defines the test case to be used by
channel tests.
Such tests rely on `Phoenix.ChannelTest` and also
import other functionality to make it easier
to build common data structures and query the data layer.
Finally, if the test case interacts with the database,
we enable the SQL sandbox, so changes done to the database
are reverted at the end of every test. If you are using
PostgreSQL, you can even run database tests asynchronously
by setting `use KafkaexLagExporterWeb.ChannelCase, async: true`, although
this option is not recommended for other databases.
"""
use ExUnit.CaseTemplate
using do
quote do
# Import conveniences for testing with channels
import Phoenix.ChannelTest
import KafkaexLagExporterWeb.ChannelCase
# The default endpoint for testing
@endpoint KafkaexLagExporterWeb.Endpoint
end
end
setup _tags do
:ok
end
end

View File

@@ -1,37 +0,0 @@
defmodule KafkaexLagExporterWeb.ConnCase do
@moduledoc """
This module defines the test case to be used by
tests that require setting up a connection.
Such tests rely on `Phoenix.ConnTest` and also
import other functionality to make it easier
to build common data structures and query the data layer.
Finally, if the test case interacts with the database,
we enable the SQL sandbox, so changes done to the database
are reverted at the end of every test. If you are using
PostgreSQL, you can even run database tests asynchronously
by setting `use KafkaexLagExporterWeb.ConnCase, async: true`, although
this option is not recommended for other databases.
"""
use ExUnit.CaseTemplate
using do
quote do
# Import conveniences for testing with connections
import Plug.Conn
import Phoenix.ConnTest
import KafkaexLagExporterWeb.ConnCase
alias KafkaexLagExporterWeb.Router.Helpers, as: Routes
# The default endpoint for testing
@endpoint KafkaexLagExporterWeb.Endpoint
end
end
setup _tags do
{:ok, conn: Phoenix.ConnTest.build_conn()}
end
end

View File

@@ -1,2 +1 @@
Application.ensure_all_started(:mox)
ExUnit.start()

View File

@@ -1,6 +1,5 @@
defmodule KafkaexLagExporterTopicNameParserTest do
use ExUnit.Case
doctest KafkaexLagExporter.TopicNameParser
use ExUnit.Case, async: true
test "should parse single topic" do
test_member_assignment =