Compare commits
26 Commits
207926ed83
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 29c032cb16 | |||
| a34e91770f | |||
| 98e79b927d | |||
| 9e9676d4e4 | |||
| bf8389b4e0 | |||
| ee5b69f646 | |||
| 7104414bc1 | |||
| d580b0f1d0 | |||
| 0312a1e37c | |||
| c98e8da2ec | |||
| 0016cd0f74 | |||
| b098385e39 | |||
| eae84fef1f | |||
| baed79e1ba | |||
| a68a0126c8 | |||
| 1630d1dcda | |||
| 4922da165c | |||
| 9fbf7a98b8 | |||
| 08b5923d52 | |||
| 6bf68b74ec | |||
| 1cc9afbcf4 | |||
| f09ad58fb5 | |||
| 3d835480d1 | |||
| ada8f12309 | |||
| ac5280acfd | |||
| 7b40cbedd1 |
@@ -1,4 +1,2 @@
|
||||
elixir 1.16.2-otp-26
|
||||
erlang 26.0.2
|
||||
tilt 0.33.11
|
||||
kind 0.22.0
|
||||
|
||||
12
LICENSE
Normal file
12
LICENSE
Normal file
@@ -0,0 +1,12 @@
|
||||
ISC License
|
||||
|
||||
Copyright 2024 Pascal Schmid
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
|
||||
granted, provided that the above copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
|
||||
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
|
||||
OF THIS SOFTWARE.
|
||||
57
README.md
57
README.md
@@ -1,14 +1,43 @@
|
||||
# KafkaexLagExporter
|
||||
# KafkaExLagExporter
|
||||
|
||||
This project will collect Kafka consumer lag and provide them via Prometheus.
|
||||
|
||||
## Metrics
|
||||
|
||||
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner.
|
||||
KafkaExLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
|
||||
|
||||
**`kafka_consumergroup_group_topic_sum_lag`**
|
||||
|
||||
Labels: `cluster_name, group, topic, consumer_id, member_host`
|
||||
|
||||
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this
|
||||
topic for this group.
|
||||
|
||||
**`kafka_consumergroup_group_lag`**
|
||||
|
||||
Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
|
||||
|
||||
The difference between the last produced offset and the last consumed offset for this partition in this topic
|
||||
partition for this group.
|
||||
|
||||
## Start
|
||||
|
||||
```bash
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 lechindianer/kafkaex_lag_exporter:0.1
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
```
|
||||
|
||||
Now you can check the exposed metrics at [localhost:4000](localhost:4000).
|
||||
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
|
||||
|
||||
## Configuration
|
||||
|
||||
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
|
||||
value set `KAFKA_EX_INTERVAL_MS`, i.e.
|
||||
|
||||
```bash
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
|
||||
lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
```
|
||||
|
||||
## Developing
|
||||
|
||||
@@ -18,24 +47,38 @@ To start the project locally:
|
||||
KAFKA_BROKERS="localhost:9092" iex -S mix
|
||||
```
|
||||
|
||||
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||
KafkaexLagExporter:
|
||||
|
||||
```bash
|
||||
docker-compose up --build
|
||||
docker compose up --build
|
||||
```
|
||||
|
||||
Kowl is served at [localhost:8080](localhost:8080).
|
||||
Kowl is served at [http://localhost:8080](http://localhost:8080).
|
||||
|
||||
### Tests
|
||||
|
||||
```bash
|
||||
MIX_ENV=test mix test --no-start
|
||||
```
|
||||
|
||||
# Don't forget to check credo for code violations:
|
||||
### Code style
|
||||
|
||||
Don't forget to check [credo](https://hexdocs.pm/credo/overview.html) for code violations:
|
||||
|
||||
```bash
|
||||
mix credo
|
||||
```
|
||||
|
||||
This project also leverages the use of typespecs in order to provide static code checking:
|
||||
|
||||
```bash
|
||||
mix dialyzer
|
||||
```
|
||||
|
||||
## Links
|
||||
|
||||
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
|
||||
|
||||
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me
|
||||
creating my first real Elixir project. Thank you!
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
kafka:
|
||||
brokers:
|
||||
- kafka1:9092
|
||||
- kafka2:9092
|
||||
- kafka3:9092
|
||||
|
||||
# server:
|
||||
# listenPort: 8080
|
||||
@@ -32,7 +32,7 @@ services:
|
||||
- redpanda_network
|
||||
|
||||
console:
|
||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
||||
image: docker.redpanda.com/redpandadata/console:v2.4.6
|
||||
entrypoint: /bin/sh
|
||||
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
|
||||
environment:
|
||||
@@ -63,7 +63,6 @@ services:
|
||||
image: quay.io/cloudhut/owl-shop:latest
|
||||
networks:
|
||||
- redpanda_network
|
||||
#platform: 'linux/amd64'
|
||||
environment:
|
||||
- SHOP_KAFKA_BROKERS=redpanda:29092
|
||||
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
|
||||
@@ -78,7 +77,6 @@ services:
|
||||
container_name: connect
|
||||
networks:
|
||||
- redpanda_network
|
||||
#platform: 'linux/amd64'
|
||||
depends_on:
|
||||
- redpanda
|
||||
ports:
|
||||
@@ -109,6 +107,8 @@ services:
|
||||
- '4000:4000'
|
||||
environment:
|
||||
- KAFKA_BROKERS=redpanda:29092
|
||||
networks:
|
||||
- redpanda_network
|
||||
depends_on:
|
||||
- redpanda
|
||||
restart: "unless-stopped"
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
kind delete cluster
|
||||
|
||||
kind create cluster --config deployment/kind.yaml
|
||||
|
||||
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
|
||||
|
||||
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
|
||||
|
||||
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
|
||||
|
||||
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
|
||||
|
||||
echo "address=/kind.cluster/$LB_IP"
|
||||
@@ -1,21 +0,0 @@
|
||||
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
|
||||
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
|
||||
k8s_yaml('./redpanda-deployment.yaml')
|
||||
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
|
||||
k8s_yaml('./redpanda-service.yaml')
|
||||
|
||||
k8s_yaml('./connect-service.yaml')
|
||||
k8s_yaml('./connect-deployment.yaml')
|
||||
|
||||
k8s_yaml('./owl-shop-deployment.yaml')
|
||||
|
||||
k8s_yaml('./console-deployment.yaml')
|
||||
k8s_yaml('./console-service.yaml')
|
||||
k8s_yaml('./console-ingress.yaml')
|
||||
|
||||
docker_build('kafka-lag-exporter', './..')
|
||||
|
||||
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
|
||||
k8s_yaml('./kafka-lag-exporter-service.yaml')
|
||||
|
||||
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)
|
||||
@@ -1,48 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.service: connect
|
||||
name: connect
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: connect
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: connect
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: CONNECT_BOOTSTRAP_SERVERS
|
||||
value: redpanda:29092
|
||||
- name: CONNECT_CONFIGURATION
|
||||
value: |
|
||||
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
||||
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
||||
group.id=connectors-cluster
|
||||
offset.storage.topic=_internal_connectors_offsets
|
||||
config.storage.topic=_internal_connectors_configs
|
||||
status.storage.topic=_internal_connectors_status
|
||||
config.storage.replication.factor=-1
|
||||
offset.storage.replication.factor=-1
|
||||
status.storage.replication.factor=-1
|
||||
offset.flush.interval.ms=1000
|
||||
producer.linger.ms=50
|
||||
producer.batch.size=131072
|
||||
- name: CONNECT_GC_LOG_ENABLED
|
||||
value: "false"
|
||||
- name: CONNECT_HEAP_OPTS
|
||||
value: -Xms512M -Xmx512M
|
||||
- name: CONNECT_LOG_LEVEL
|
||||
value: info
|
||||
image: docker.redpanda.com/redpandadata/connectors:latest
|
||||
name: connect
|
||||
ports:
|
||||
- containerPort: 8083
|
||||
hostname: connect
|
||||
restartPolicy: Always
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: connect
|
||||
name: connect
|
||||
spec:
|
||||
ports:
|
||||
- name: "8083"
|
||||
port: 8083
|
||||
targetPort: 8083
|
||||
selector:
|
||||
io.kompose.service: connect
|
||||
@@ -1,48 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.service: console
|
||||
name: console
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: console
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: console
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- -c
|
||||
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
|
||||
command:
|
||||
- /bin/sh
|
||||
env:
|
||||
- name: CONFIG_FILEPATH
|
||||
value: /tmp/config.yml
|
||||
- name: CONSOLE_CONFIG_FILE
|
||||
value: |
|
||||
kafka:
|
||||
brokers: ["redpanda:29092"]
|
||||
schemaRegistry:
|
||||
enabled: true
|
||||
urls: ["http://redpanda:8081"]
|
||||
redpanda:
|
||||
adminApi:
|
||||
enabled: true
|
||||
urls: ["http://redpanda:9644"]
|
||||
connect:
|
||||
enabled: true
|
||||
clusters:
|
||||
- name: local-connect-cluster
|
||||
url: http://connect:8083
|
||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
||||
name: console
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
restartPolicy: Always
|
||||
@@ -1,17 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: ingress-myserviceb
|
||||
spec:
|
||||
rules:
|
||||
- host: console.lechindianer.hack
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: console
|
||||
port:
|
||||
number: 8080
|
||||
ingressClassName: nginx
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: console
|
||||
name: console
|
||||
spec:
|
||||
ports:
|
||||
- name: "8080"
|
||||
port: 8080
|
||||
targetPort: 8080
|
||||
selector:
|
||||
io.kompose.service: console
|
||||
@@ -1,26 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: KAFKA_BROKERS
|
||||
value: redpanda:29092
|
||||
image: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
ports:
|
||||
- containerPort: 4000
|
||||
restartPolicy: Always
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
spec:
|
||||
ports:
|
||||
- name: "4000"
|
||||
port: 4000
|
||||
targetPort: 4000
|
||||
selector:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
@@ -1,31 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: owl-shop
|
||||
name: owl-shop
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: owl-shop
|
||||
template:
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: owl-shop
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: SHOP_KAFKA_BROKERS
|
||||
value: redpanda:29092
|
||||
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
|
||||
value: "1"
|
||||
- name: SHOP_TRAFFIC_INTERVAL_DURATION
|
||||
value: 0.1s
|
||||
- name: SHOP_TRAFFIC_INTERVAL_RATE
|
||||
value: "6"
|
||||
image: quay.io/cloudhut/owl-shop:latest
|
||||
name: owl-shop
|
||||
restartPolicy: Always
|
||||
@@ -1,44 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: redpanda
|
||||
strategy:
|
||||
type: Recreate
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: redpanda
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- redpanda start
|
||||
- --smp 1
|
||||
- --overprovisioned
|
||||
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
|
||||
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
|
||||
- --pandaproxy-addr 0.0.0.0:8082
|
||||
- --advertise-pandaproxy-addr localhost:8082
|
||||
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
|
||||
name: redpanda
|
||||
ports:
|
||||
- containerPort: 8081
|
||||
- containerPort: 8082
|
||||
- containerPort: 9092
|
||||
- containerPort: 9644
|
||||
- containerPort: 29092
|
||||
volumeMounts:
|
||||
- mountPath: /var/lib/redpanda/data
|
||||
name: redpanda
|
||||
restartPolicy: Always
|
||||
volumes:
|
||||
- name: redpanda
|
||||
persistentVolumeClaim:
|
||||
claimName: redpanda
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: NetworkPolicy
|
||||
metadata:
|
||||
name: redpanda-owl-shop-default
|
||||
spec:
|
||||
ingress:
|
||||
- from:
|
||||
- podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: NetworkPolicy
|
||||
metadata:
|
||||
name: redpanda-owl-shop-redpanda-network
|
||||
spec:
|
||||
ingress:
|
||||
- from:
|
||||
- podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
@@ -1,12 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: 100Mi
|
||||
@@ -1,25 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
ports:
|
||||
- name: "8081"
|
||||
port: 8081
|
||||
targetPort: 8081
|
||||
- name: "8082"
|
||||
port: 8082
|
||||
targetPort: 8082
|
||||
- name: "9092"
|
||||
port: 9092
|
||||
targetPort: 9092
|
||||
- name: "9644"
|
||||
port: 9644
|
||||
targetPort: 9644
|
||||
- name: "29092"
|
||||
port: 29092
|
||||
targetPort: 29092
|
||||
selector:
|
||||
io.kompose.service: redpanda
|
||||
@@ -1,13 +0,0 @@
|
||||
defmodule KafkaexLagExporter do
|
||||
@moduledoc """
|
||||
KafkaexLagExporter keeps the contexts that define your domain
|
||||
and business logic.
|
||||
|
||||
Contexts are also responsible for managing your data, regardless
|
||||
if it comes from the database, an external API or others.
|
||||
"""
|
||||
|
||||
def hello() do
|
||||
:world
|
||||
end
|
||||
end
|
||||
@@ -15,7 +15,7 @@ defmodule KafkaexLagExporter.Application do
|
||||
{Phoenix.PubSub, name: KafkaexLagExporter.PubSub},
|
||||
# Start the Endpoint (http/https)
|
||||
KafkaexLagExporterWeb.Endpoint,
|
||||
KafkaexLagExporter.ConsumerOffset
|
||||
KafkaexLagExporter.ConsumerOffsetRunner
|
||||
]
|
||||
|
||||
# See https://hexdocs.pm/elixir/Supervisor.html
|
||||
|
||||
@@ -3,41 +3,28 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
|
||||
|
||||
@callback connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
|
||||
|
||||
@callback resolve_offsets(binary, :earliest | :latest, atom) ::
|
||||
list({non_neg_integer, integer})
|
||||
|
||||
@callback fetch_committed_offsets(binary, binary, atom) ::
|
||||
list({non_neg_integer, non_neg_integer})
|
||||
|
||||
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
|
||||
|
||||
@callback lag_total(binary, binary, atom) :: non_neg_integer
|
||||
|
||||
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
|
||||
|
||||
@callback topic_names_for_consumer_groups(
|
||||
@callback get_consumer_group_info(
|
||||
{host :: atom, port :: non_neg_integer},
|
||||
list(binary),
|
||||
list(binary)
|
||||
) :: list(binary)
|
||||
) :: list({consumer_group :: binary, topics :: list(binary)})
|
||||
|
||||
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
|
||||
|
||||
def connection(client), do: impl().connection(client)
|
||||
|
||||
def resolve_offsets(topic, type, client), do: impl().resolve_offsets(topic, type, client)
|
||||
def get_consumer_group_names(endpoint), do: impl().get_consumer_group_names(endpoint)
|
||||
|
||||
def get_consumer_group_info(endpoint, list, consumer_group_names),
|
||||
do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
|
||||
|
||||
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
|
||||
|
||||
def fetch_committed_offsets(topic, consumer_group, client),
|
||||
do: impl().fetch_committed_offsets(topic, consumer_group, client)
|
||||
|
||||
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
|
||||
|
||||
def lag_total(topic, consumer_group, client),
|
||||
do: impl().lag_total(topic, consumer_group, client)
|
||||
|
||||
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
|
||||
|
||||
def topic_names_for_consumer_groups(endpoint, list, consumer_group_names),
|
||||
do: impl().topic_names_for_consumer_groups(endpoint, list, consumer_group_names)
|
||||
|
||||
defp impl,
|
||||
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)
|
||||
end
|
||||
|
||||
@@ -1,41 +1,17 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffset do
|
||||
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
|
||||
@moduledoc "Struct holding all relevant telemetry information of consumers"
|
||||
|
||||
use GenServer
|
||||
@type t :: %__MODULE__{
|
||||
consumer_group: binary,
|
||||
topic: binary,
|
||||
lag: list({partition :: non_neg_integer, lag :: non_neg_integer}),
|
||||
consumer_id: binary,
|
||||
member_host: binary
|
||||
}
|
||||
|
||||
require Logger
|
||||
|
||||
@interval 5_000
|
||||
|
||||
def start_link(default) when is_list(default) do
|
||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_) do
|
||||
Logger.info("Starting #{__MODULE__}")
|
||||
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||
|
||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
|
||||
{:ok, %{endpoints: endpoints}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
[endpoint | _] = state.endpoints
|
||||
|
||||
{consumer_lags, consumer_lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
|
||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, consumer_lags)
|
||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, consumer_lag_sum)
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
defstruct consumer_group: "",
|
||||
topic: "",
|
||||
lag: [],
|
||||
consumer_id: "",
|
||||
member_host: ""
|
||||
end
|
||||
|
||||
@@ -1,38 +1,49 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
||||
@moduledoc "Calculate summarized lag for each consumer group"
|
||||
|
||||
require Logger
|
||||
alias KafkaexLagExporter.ConsumerOffset
|
||||
alias KafkaexLagExporter.KafkaUtils
|
||||
|
||||
# TODO: change return type
|
||||
@spec get(KafkaexLagExporter.KafkaWrapper.endpoint()) :: {any(), any()}
|
||||
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
|
||||
lags: list(ConsumerOffset.t()),
|
||||
sum: list(ConsumerOffset.t())
|
||||
}
|
||||
def get(endpoint) do
|
||||
consumer_group_names = KafkaexLagExporter.KafkaUtils.get_consumer_group_names(endpoint)
|
||||
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
|
||||
|
||||
consumer_lags =
|
||||
KafkaexLagExporter.KafkaUtils.topic_names_for_consumer_groups(
|
||||
endpoint,
|
||||
[],
|
||||
consumer_group_names
|
||||
)
|
||||
|> Enum.map(fn [consumer_group, topics] ->
|
||||
[consumer_group, get_lag_for_consumer(consumer_group, topics)]
|
||||
end)
|
||||
KafkaUtils.get_consumer_group_info(endpoint, [], consumer_group_names)
|
||||
|> Enum.flat_map(&get_lag_per_topic(&1))
|
||||
|
||||
consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags)
|
||||
|
||||
%{lags: consumer_lags, sum: consumer_lag_sum}
|
||||
end
|
||||
|
||||
defp get_lag_for_consumer(consumer_group, topics) do
|
||||
topics
|
||||
|> Enum.flat_map(fn topic ->
|
||||
KafkaexLagExporter.KafkaUtils.lag(topic, consumer_group, :client1)
|
||||
@spec get_lag_per_topic(
|
||||
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|
||||
member_host :: binary}
|
||||
) :: list(ConsumerOffset.t())
|
||||
defp get_lag_per_topic({consumer_group, topics, consumer_id, member_host}) do
|
||||
Enum.map(topics, fn topic ->
|
||||
lag = KafkaUtils.lag(topic, consumer_group, :client1)
|
||||
|
||||
%ConsumerOffset{
|
||||
consumer_group: consumer_group,
|
||||
topic: topic,
|
||||
lag: lag,
|
||||
consumer_id: consumer_id,
|
||||
member_host: member_host
|
||||
}
|
||||
end)
|
||||
end
|
||||
|
||||
@spec get_lag_per_topic(list(ConsumerOffset.t())) :: list(ConsumerOffset.t())
|
||||
defp get_lag_for_consumer_sum(lags_per_consumer_group) do
|
||||
lags_per_consumer_group
|
||||
|> Enum.map(fn [topic, lag_per_partition] -> [topic, sum_topic_lag(lag_per_partition, 0)] end)
|
||||
Enum.map(lags_per_consumer_group, fn consumer_offset ->
|
||||
lag_sum = sum_topic_lag(consumer_offset.lag, 0)
|
||||
%ConsumerOffset{consumer_offset | lag: {0, lag_sum}}
|
||||
end)
|
||||
end
|
||||
|
||||
defp sum_topic_lag([], acc), do: acc
|
||||
|
||||
45
lib/kafkaex_lag_exporter/consumer_offset_runner.ex
Normal file
45
lib/kafkaex_lag_exporter/consumer_offset_runner.ex
Normal file
@@ -0,0 +1,45 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
||||
@moduledoc "Genserver implementation to set offset metrics for consumer groups"
|
||||
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
def start_link(default) when is_list(default) do
|
||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_) do
|
||||
Logger.info("Starting #{__MODULE__}")
|
||||
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||
|
||||
interval =
|
||||
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|
||||
|> String.to_integer()
|
||||
|
||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||
Logger.info("Updating lag information every #{interval} milliseconds")
|
||||
|
||||
Process.send_after(self(), :tick, interval)
|
||||
|
||||
{:ok, %{endpoints: endpoints, interval: interval}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
[endpoint | _] = state.endpoints
|
||||
interval = state.interval
|
||||
|
||||
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
|
||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||
|
||||
Process.send_after(self(), :tick, interval)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
112
lib/kafkaex_lag_exporter/kafka_utils.ex
Normal file
112
lib/kafkaex_lag_exporter/kafka_utils.ex
Normal file
@@ -0,0 +1,112 @@
|
||||
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
|
||||
|
||||
defmodule KafkaexLagExporter.KafkaUtils do
|
||||
@behaviour KafkaexLagExporter.KafkaUtils.Behaviour
|
||||
|
||||
@moduledoc "Utility functions for dealing with Kafka"
|
||||
|
||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||
|
||||
@default_client :client1
|
||||
|
||||
def connection, do: connection(@default_client)
|
||||
@impl true
|
||||
def connection(client) do
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
config = clients[client]
|
||||
|
||||
endpoints = config[:endpoints] || [{~c"localhost", 9092}]
|
||||
|
||||
sock_opts =
|
||||
case Keyword.fetch(config, :ssl) do
|
||||
{:ok, ssl_opts} ->
|
||||
[ssl: ssl_opts]
|
||||
|
||||
:error ->
|
||||
[]
|
||||
end
|
||||
|
||||
{endpoints, sock_opts}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def get_consumer_group_names(endpoint) do
|
||||
[{_, groups} | _] = KafkaWrapper.list_all_groups([endpoint], [])
|
||||
|
||||
groups
|
||||
|> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
|
||||
|> Enum.map(fn {_, group_name, _} -> group_name end)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def get_consumer_group_info(endpoint, list \\ [], consumer_group_names) do
|
||||
{:ok, group_descriptions} = KafkaWrapper.describe_groups(endpoint, list, consumer_group_names)
|
||||
|
||||
group_descriptions
|
||||
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
||||
get_member_info(consumer_group, members)
|
||||
end)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def lag(topic, consumer_group, client) do
|
||||
offsets =
|
||||
resolve_offsets(topic, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
committed_offsets =
|
||||
fetch_committed_offsets(topic, consumer_group, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||
{part, current - committed}
|
||||
end
|
||||
end
|
||||
|
||||
@spec resolve_offsets(binary, atom) :: list({non_neg_integer, integer})
|
||||
defp resolve_offsets(topic, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||
|
||||
for i <- Range.new(0, partitions_count - 1),
|
||||
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
||||
{i, offset}
|
||||
end
|
||||
end
|
||||
|
||||
@spec fetch_committed_offsets(binary, binary, atom) ::
|
||||
list({non_neg_integer, non_neg_integer})
|
||||
defp fetch_committed_offsets(_topic, consumer_group, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
for r <- response, pr <- r[:partitions] do
|
||||
{pr[:partition_index], pr[:committed_offset]}
|
||||
end
|
||||
end
|
||||
|
||||
@spec get_member_info(
|
||||
binary,
|
||||
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
||||
) ::
|
||||
list(
|
||||
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|
||||
member_host :: binary}
|
||||
)
|
||||
defp get_member_info(consumer_group, members) do
|
||||
members
|
||||
|> Enum.map(fn %{
|
||||
client_host: member_host,
|
||||
member_assignment: member_assignment,
|
||||
member_id: consumer_id
|
||||
} ->
|
||||
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||
{topics, consumer_id, member_host}
|
||||
end)
|
||||
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
||||
{consumer_group, topics, consumer_id, member_host}
|
||||
end)
|
||||
end
|
||||
end
|
||||
@@ -3,6 +3,8 @@ defmodule KafkaexLagExporter.Metrics do
|
||||
|
||||
use PromEx.Plugin
|
||||
|
||||
alias KafkaexLagExporter.ConsumerOffset
|
||||
|
||||
require Logger
|
||||
|
||||
@kafka_event :kafka
|
||||
@@ -21,52 +23,60 @@ defmodule KafkaexLagExporter.Metrics do
|
||||
event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||
description: "Sum of group offset lag across topic partitions",
|
||||
measurement: :lag,
|
||||
# TODO: add more tags like member_host, consumer_id, client_id, ...
|
||||
tags: [:cluster_name, :group, :topic]
|
||||
tags: [:cluster_name, :group, :topic, :consumer_id, :member_host]
|
||||
),
|
||||
last_value(
|
||||
[@kafka_event, :consumergroup, :group, :lag],
|
||||
event_name: [@kafka_event, :consumergroup, :group, :lag],
|
||||
description: "Group offset lag of a partition",
|
||||
measurement: :lag,
|
||||
# TODO: add more tags like member_host, consumer_id, client_id, ...
|
||||
tags: [:cluster_name, :group, :partition, :topic]
|
||||
tags: [:cluster_name, :group, :partition, :topic, :consumer_id, :member_host]
|
||||
)
|
||||
]
|
||||
)
|
||||
end
|
||||
|
||||
@spec group_sum_lag(
|
||||
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
|
||||
list(ConsumerOffset.t())
|
||||
) :: :ok
|
||||
@doc false
|
||||
def group_sum_lag({host, _port}, consumer_lags) do
|
||||
Enum.each(consumer_lags, fn [group_name, lag] ->
|
||||
def group_sum_lag({host, _port}, consumer_offsets) do
|
||||
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||
lag = elem(consumer_offset.lag, 1)
|
||||
|
||||
:telemetry.execute(
|
||||
[@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
|
||||
%{
|
||||
lag: lag
|
||||
},
|
||||
%{lag: lag},
|
||||
%{
|
||||
cluster_name: host,
|
||||
group: group_name,
|
||||
topic: []
|
||||
group: consumer_offset.consumer_group,
|
||||
topic: consumer_offset.topic,
|
||||
consumer_id: consumer_offset.consumer_id,
|
||||
member_host: consumer_offset.member_host
|
||||
}
|
||||
)
|
||||
end)
|
||||
end
|
||||
|
||||
@spec group_lag_per_partition(
|
||||
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
|
||||
list(ConsumerOffset.t())
|
||||
) :: :ok
|
||||
@doc false
|
||||
def group_lag_per_partition({host, _port}, consumer_lags) do
|
||||
Enum.each(consumer_lags, fn [group_name, lags] ->
|
||||
Enum.each(lags, fn {partition, lag} ->
|
||||
def group_lag_per_partition({host, _port}, consumer_offsets) do
|
||||
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||
Enum.each(consumer_offset.lag, fn {partition, lag} ->
|
||||
:telemetry.execute(
|
||||
[@kafka_event, :consumergroup, :group, :lag],
|
||||
%{
|
||||
lag: lag
|
||||
},
|
||||
%{lag: lag},
|
||||
%{
|
||||
cluster_name: host,
|
||||
group: group_name,
|
||||
group: consumer_offset.consumer_group,
|
||||
partition: partition,
|
||||
topic: []
|
||||
topic: consumer_offset.topic,
|
||||
consumer_id: consumer_offset.consumer_id,
|
||||
member_host: consumer_offset.member_host
|
||||
}
|
||||
)
|
||||
end)
|
||||
|
||||
@@ -1,56 +1,5 @@
|
||||
defmodule KafkaexLagExporter.PromEx do
|
||||
@moduledoc """
|
||||
Be sure to add the following to finish setting up PromEx:
|
||||
|
||||
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
|
||||
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
|
||||
more details regarding configuring PromEx:
|
||||
```
|
||||
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
|
||||
disabled: false,
|
||||
manual_metrics_start_delay: :no_delay,
|
||||
drop_metrics_groups: [],
|
||||
grafana: :disabled,
|
||||
metrics_server: :disabled
|
||||
```
|
||||
|
||||
2. Add this module to your application supervision tree. It should be one of the first
|
||||
things that is started so that no Telemetry events are missed. For example, if PromEx
|
||||
is started after your Repo module, you will miss Ecto's init events and the dashboards
|
||||
will be missing some data points:
|
||||
```
|
||||
def start(_type, _args) do
|
||||
children = [
|
||||
KafkaexLagExporter.PromEx,
|
||||
|
||||
...
|
||||
]
|
||||
|
||||
...
|
||||
end
|
||||
```
|
||||
|
||||
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
|
||||
server using the `:metrics_server` config options). Be sure to put this plug before
|
||||
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
|
||||
endpoint create their own metrics and logs which can pollute your logs/metrics given
|
||||
that Prometheus will scrape at a regular interval and that can get noisy:
|
||||
```
|
||||
defmodule KafkaexLagExporterWeb.Endpoint do
|
||||
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
|
||||
|
||||
...
|
||||
|
||||
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
|
||||
|
||||
...
|
||||
end
|
||||
```
|
||||
|
||||
4. Update the list of plugins in the `plugins/0` function return list to reflect your
|
||||
application's dependencies. Also update the list of dashboards that are to be uploaded
|
||||
to Grafana in the `dashboards/0` function.
|
||||
"""
|
||||
@moduledoc false
|
||||
|
||||
use PromEx, otp_app: :kafkaex_lag_exporter
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ defmodule KafkaexLagExporter.TopicNameParser do
|
||||
|
||||
@invalid_topic_characters ~r/[^[:alnum:]\-\._]/
|
||||
|
||||
@spec parse_topic_names(binary) :: list(binary)
|
||||
def parse_topic_names(member_assignment) do
|
||||
member_assignment
|
||||
|> String.chunk(:printable)
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
# source code taken from https://github.com/reachfh/brod_group_subscriber_example
|
||||
|
||||
defmodule KafkaexLagExporter.KafkaUtils do
|
||||
@moduledoc "Utility functions for dealing with Kafka"
|
||||
|
||||
@default_client :client1
|
||||
|
||||
@type endpoint() :: {host :: atom(), port :: non_neg_integer()}
|
||||
|
||||
def connection, do: connection(@default_client)
|
||||
@spec connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
|
||||
def connection(client) do
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
config = clients[client]
|
||||
|
||||
endpoints = config[:endpoints] || [{~c"localhost", 9092}]
|
||||
|
||||
sock_opts =
|
||||
case Keyword.fetch(config, :ssl) do
|
||||
{:ok, ssl_opts} ->
|
||||
[ssl: ssl_opts]
|
||||
|
||||
:error ->
|
||||
[]
|
||||
end
|
||||
|
||||
{endpoints, sock_opts}
|
||||
end
|
||||
|
||||
@spec resolve_offsets(binary(), :earliest | :latest, atom()) ::
|
||||
list({non_neg_integer(), integer()})
|
||||
def resolve_offsets(topic, type, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, partitions_count} = :brod.get_partitions_count(client, topic)
|
||||
|
||||
for i <- Range.new(0, partitions_count - 1),
|
||||
{:ok, offset} = :brod.resolve_offset(endpoints, topic, i, type, sock_opts) do
|
||||
{i, offset}
|
||||
end
|
||||
end
|
||||
|
||||
@spec fetch_committed_offsets(binary(), binary(), atom()) ::
|
||||
{non_neg_integer(), non_neg_integer()}
|
||||
def fetch_committed_offsets(_topic, consumer_group, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
{:ok, response} = :brod.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
for r <- response,
|
||||
pr <- r[:partitions],
|
||||
do: {pr[:partition_index], pr[:committed_offset]}
|
||||
end
|
||||
|
||||
@spec lag(binary(), binary(), atom()) :: list({non_neg_integer(), integer()})
|
||||
def lag(topic, consumer_group, client) do
|
||||
offsets =
|
||||
resolve_offsets(topic, :latest, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
committed_offsets =
|
||||
fetch_committed_offsets(topic, consumer_group, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||
{part, current - committed}
|
||||
end
|
||||
end
|
||||
|
||||
@spec lag_total(binary(), binary(), atom()) :: non_neg_integer()
|
||||
def lag_total(topic, consumer_group, client) do
|
||||
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
|
||||
acc -> acc + recs
|
||||
end
|
||||
end
|
||||
end
|
||||
9
mix.exs
9
mix.exs
@@ -37,12 +37,13 @@ defmodule KafkaexLagExporter.MixProject do
|
||||
{:telemetry_poller, "~> 1.0"},
|
||||
{:jason, "~> 1.4.0"},
|
||||
{:plug_cowboy, "~> 2.6.1"},
|
||||
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
|
||||
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
|
||||
{:mox, "~> 1.1", only: :test},
|
||||
{:brod, "~> 3.17.0"},
|
||||
{:prom_ex, "~> 1.8.0"},
|
||||
{:telemetry, "~> 1.2"}
|
||||
{:telemetry, "~> 1.2"},
|
||||
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
|
||||
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
|
||||
{:patch, "~> 0.13.0", only: :test},
|
||||
{:mix_test_watch, "~> 1.2.0", only: [:dev, :test], runtime: false}
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
3
mix.lock
3
mix.lock
@@ -16,10 +16,11 @@
|
||||
"kafka_protocol": {:hex, :kafka_protocol, "4.1.5", "d15e64994a8ca99716ab47db4132614359ac1bfa56d6c5b4341fdc1aa4041518", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "c956c9357fef493b7072a35d0c3e2be02aa5186c804a412d29e62423bb15e5d9"},
|
||||
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
|
||||
"mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"},
|
||||
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
|
||||
"mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"},
|
||||
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
|
||||
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
|
||||
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
|
||||
"patch": {:hex, :patch, "0.13.0", "da48728f9086a835956200a671210fe88f67ff48bb1f92626989886493ac2081", [:mix], [], "hexpm", "d65a840d485dfa05bf6673269b56680e7537a05050684e713de125a351b28112"},
|
||||
"phoenix": {:hex, :phoenix, "1.6.16", "e5bdd18c7a06da5852a25c7befb72246de4ddc289182285f8685a40b7b5f5451", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 1.0 or ~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e15989ff34f670a96b95ef6d1d25bad0d9c50df5df40b671d8f4a669e050ac39"},
|
||||
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"},
|
||||
"phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"},
|
||||
|
||||
96
test/consumer_offset_fetcher_test.exs
Normal file
96
test/consumer_offset_fetcher_test.exs
Normal file
@@ -0,0 +1,96 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
|
||||
use ExUnit.Case
|
||||
use Patch
|
||||
|
||||
alias KafkaexLagExporter.ConsumerOffset
|
||||
|
||||
@test_consumer_group_name1 "test_consumer_1"
|
||||
@test_consumer_group_name2 "test_consumer_2"
|
||||
@test_lags1 [{0, 23}, {1, 42}, {2, 666}]
|
||||
@test_lags2 [{0, 1}, {1, 2}, {2, 3}]
|
||||
@test_topic1 "test_topic_1"
|
||||
@test_topic2 "test_topic_2"
|
||||
@test_topic3 "test_topic_3"
|
||||
@test_consumer_id1 "test_consumer_id1"
|
||||
@test_consumer_id2 "test_consumer_id2"
|
||||
@test_member_host "127.0.0.1"
|
||||
|
||||
setup do
|
||||
patch(
|
||||
KafkaexLagExporter.KafkaUtils,
|
||||
:get_consumer_group_names,
|
||||
fn _ -> [@test_consumer_group_name1, @test_consumer_group_name2] end
|
||||
)
|
||||
|
||||
patch(KafkaexLagExporter.KafkaUtils, :lag, &lag(&1, &2, &3))
|
||||
|
||||
patch(
|
||||
KafkaexLagExporter.KafkaUtils,
|
||||
:get_consumer_group_info,
|
||||
fn _, _, _ ->
|
||||
[
|
||||
{@test_consumer_group_name1, [@test_topic1, @test_topic2], @test_consumer_id1,
|
||||
@test_member_host},
|
||||
{@test_consumer_group_name2, [@test_topic3], @test_consumer_id2, @test_member_host}
|
||||
]
|
||||
end
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "should return the calculated lags" do
|
||||
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
|
||||
|
||||
assert sum == [
|
||||
%ConsumerOffset{
|
||||
consumer_group: @test_consumer_group_name1,
|
||||
topic: @test_topic1,
|
||||
lag: {0, 731},
|
||||
consumer_id: @test_consumer_id1,
|
||||
member_host: @test_member_host
|
||||
},
|
||||
%ConsumerOffset{
|
||||
consumer_group: @test_consumer_group_name1,
|
||||
topic: @test_topic2,
|
||||
lag: {0, 6},
|
||||
consumer_id: @test_consumer_id1,
|
||||
member_host: @test_member_host
|
||||
},
|
||||
%ConsumerOffset{
|
||||
consumer_group: @test_consumer_group_name2,
|
||||
topic: @test_topic3,
|
||||
lag: {0, 6},
|
||||
consumer_id: @test_consumer_id2,
|
||||
member_host: @test_member_host
|
||||
}
|
||||
]
|
||||
|
||||
assert lags == [
|
||||
%ConsumerOffset{
|
||||
consumer_group: @test_consumer_group_name1,
|
||||
topic: @test_topic1,
|
||||
lag: @test_lags1,
|
||||
consumer_id: @test_consumer_id1,
|
||||
member_host: @test_member_host
|
||||
},
|
||||
%ConsumerOffset{
|
||||
consumer_group: @test_consumer_group_name1,
|
||||
topic: @test_topic2,
|
||||
lag: @test_lags2,
|
||||
consumer_id: @test_consumer_id1,
|
||||
member_host: @test_member_host
|
||||
},
|
||||
%ConsumerOffset{
|
||||
consumer_group: @test_consumer_group_name2,
|
||||
topic: @test_topic3,
|
||||
lag: @test_lags2,
|
||||
consumer_id: @test_consumer_id2,
|
||||
member_host: @test_member_host
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
defp lag(@test_topic1, _, _), do: @test_lags1
|
||||
defp lag(_, _, _), do: @test_lags2
|
||||
end
|
||||
195
test/kafka_utils_test.exs
Normal file
195
test/kafka_utils_test.exs
Normal file
@@ -0,0 +1,195 @@
|
||||
defmodule KafkaexLagExporter.KafkaUtils.Test do
|
||||
use ExUnit.Case
|
||||
use Patch
|
||||
|
||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||
alias KafkaexLagExporter.KafkaUtils
|
||||
|
||||
@test_endpoint {"test_host", "1234"}
|
||||
@test_sock_opts [ssl: []]
|
||||
@test_group_name1 "test-consumer_group1"
|
||||
@test_group_name2 "test-consumer_group"
|
||||
@test_topic_name "test-topic_name"
|
||||
|
||||
setup do
|
||||
patch(
|
||||
KafkaUtils,
|
||||
:connection,
|
||||
fn _ -> {@test_endpoint, @test_sock_opts} end
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
describe "get_consumer_group_names" do
|
||||
setup do
|
||||
consumer_info1 = {:a, @test_group_name1, "consumer"}
|
||||
consumer_info2 = {:b, @test_group_name2, "something-other"}
|
||||
|
||||
patch(KafkaWrapper, :list_all_groups, fn _, _ ->
|
||||
[{@test_endpoint, [consumer_info1, consumer_info2]}]
|
||||
end)
|
||||
|
||||
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
|
||||
|
||||
[group_names: group_names]
|
||||
end
|
||||
|
||||
test "should list all groups" do
|
||||
assert_called(KafkaWrapper.list_all_groups([@test_endpoint], []))
|
||||
end
|
||||
|
||||
test "should filter for consumer groups and return names", context do
|
||||
assert context[:group_names] === [@test_group_name1]
|
||||
end
|
||||
end
|
||||
|
||||
describe "get_consumer_group_info" do
|
||||
setup do
|
||||
consumer_group_name = "test-group-id1"
|
||||
member_assignment = "test_member_assignment"
|
||||
member_assignment = "test_member_assignment"
|
||||
consumer_id = "test_consumer_id"
|
||||
member_host = "test_member_host"
|
||||
|
||||
patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
|
||||
{
|
||||
:ok,
|
||||
[
|
||||
%{
|
||||
group_id: consumer_group_name,
|
||||
members: [
|
||||
%{
|
||||
client_host: member_host,
|
||||
member_assignment: member_assignment,
|
||||
member_id: consumer_id
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
end)
|
||||
|
||||
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
|
||||
|
||||
group_info =
|
||||
KafkaUtils.get_consumer_group_info(@test_endpoint, [], [
|
||||
@test_group_name1,
|
||||
@test_group_name2
|
||||
])
|
||||
|
||||
[
|
||||
member_host: member_host,
|
||||
group_info: group_info,
|
||||
consumer_group_name: consumer_group_name,
|
||||
member_assignment: member_assignment,
|
||||
consumer_id: consumer_id
|
||||
]
|
||||
end
|
||||
|
||||
test "should describe groups" do
|
||||
assert_called(
|
||||
KafkaWrapper.describe_groups(@test_endpoint, [], [@test_group_name1, @test_group_name2])
|
||||
)
|
||||
end
|
||||
|
||||
test "should parse topic names", context do
|
||||
expected_member_assignment = context[:expected_member_assignment]
|
||||
|
||||
assert_called(
|
||||
KafkaexLagExporter.TopicNameParser.parse_topic_names(expected_member_assignment)
|
||||
)
|
||||
end
|
||||
|
||||
test "should filter for consumer groups and return names", context do
|
||||
expected_consumer_group_name = context[:consumer_group_name]
|
||||
expected_consumer_id = context[:consumer_id]
|
||||
expected_member_host = context[:member_host]
|
||||
|
||||
assert context[:group_info] === [
|
||||
{
|
||||
expected_consumer_group_name,
|
||||
[@test_topic_name],
|
||||
expected_consumer_id,
|
||||
expected_member_host
|
||||
}
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
describe "lag" do
|
||||
setup do
|
||||
consumer_group = "test-consumer-group"
|
||||
client = :client1
|
||||
committed_offset1 = 23
|
||||
committed_offset2 = 42
|
||||
resolved_offset = 50
|
||||
|
||||
partition_info1 = %{partition_index: 0, committed_offset: committed_offset1}
|
||||
partition_info2 = %{partition_index: 1, committed_offset: committed_offset2}
|
||||
|
||||
patch(KafkaWrapper, :get_partitions_count, fn _, _ -> {:ok, 2} end)
|
||||
|
||||
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
|
||||
|
||||
patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
|
||||
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
|
||||
end)
|
||||
|
||||
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)
|
||||
|
||||
[
|
||||
lag: lag,
|
||||
consumer_group: consumer_group,
|
||||
client: client,
|
||||
committed_offset1: committed_offset1,
|
||||
committed_offset2: committed_offset2,
|
||||
resolved_offset: resolved_offset
|
||||
]
|
||||
end
|
||||
|
||||
test "should start connection", context do
|
||||
expected_client = context[:client]
|
||||
|
||||
assert_called(KafkaUtils.connection(expected_client))
|
||||
end
|
||||
|
||||
test "should get partition count", context do
|
||||
expected_client = context[:client]
|
||||
|
||||
assert_called(KafkaWrapper.get_partitions_count(expected_client, @test_topic_name))
|
||||
end
|
||||
|
||||
test "should resolve offsets for each partition" do
|
||||
assert_called(
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 0, :latest, @test_sock_opts)
|
||||
)
|
||||
|
||||
assert_called(
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 1, :latest, @test_sock_opts)
|
||||
)
|
||||
end
|
||||
|
||||
test "should fetch committed offsets", context do
|
||||
expected_consumer_group = context[:consumer_group]
|
||||
|
||||
assert_called(
|
||||
KafkaWrapper.fetch_committed_offsets(
|
||||
@test_endpoint,
|
||||
@test_sock_opts,
|
||||
expected_consumer_group
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
test "should return calculated lag per partition", context do
|
||||
expected_lag_partition_1 = context[:resolved_offset] - context[:committed_offset1]
|
||||
expected_lag_partition_2 = context[:resolved_offset] - context[:committed_offset2]
|
||||
|
||||
assert context[:lag] === [
|
||||
{0, expected_lag_partition_1},
|
||||
{1, expected_lag_partition_2}
|
||||
]
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,8 +0,0 @@
|
||||
defmodule KafkaexLagExporterTest do
|
||||
use ExUnit.Case
|
||||
doctest KafkaexLagExporter
|
||||
|
||||
test "greets the world" do
|
||||
assert KafkaexLagExporter.hello() == :world
|
||||
end
|
||||
end
|
||||
@@ -1,17 +0,0 @@
|
||||
defmodule KafkaexLagExporterWeb.ErrorViewTest do
|
||||
use KafkaexLagExporterWeb.ConnCase, async: true
|
||||
|
||||
# Bring render/3 and render_to_string/3 for testing custom views
|
||||
import Phoenix.View
|
||||
|
||||
test "renders 404.json" do
|
||||
assert render(KafkaexLagExporterWeb.ErrorView, "404.json", []) == %{
|
||||
errors: %{detail: "Not Found"}
|
||||
}
|
||||
end
|
||||
|
||||
test "renders 500.json" do
|
||||
assert render(KafkaexLagExporterWeb.ErrorView, "500.json", []) ==
|
||||
%{errors: %{detail: "Internal Server Error"}}
|
||||
end
|
||||
end
|
||||
@@ -1,34 +0,0 @@
|
||||
defmodule KafkaexLagExporterWeb.ChannelCase do
|
||||
@moduledoc """
|
||||
This module defines the test case to be used by
|
||||
channel tests.
|
||||
|
||||
Such tests rely on `Phoenix.ChannelTest` and also
|
||||
import other functionality to make it easier
|
||||
to build common data structures and query the data layer.
|
||||
|
||||
Finally, if the test case interacts with the database,
|
||||
we enable the SQL sandbox, so changes done to the database
|
||||
are reverted at the end of every test. If you are using
|
||||
PostgreSQL, you can even run database tests asynchronously
|
||||
by setting `use KafkaexLagExporterWeb.ChannelCase, async: true`, although
|
||||
this option is not recommended for other databases.
|
||||
"""
|
||||
|
||||
use ExUnit.CaseTemplate
|
||||
|
||||
using do
|
||||
quote do
|
||||
# Import conveniences for testing with channels
|
||||
import Phoenix.ChannelTest
|
||||
import KafkaexLagExporterWeb.ChannelCase
|
||||
|
||||
# The default endpoint for testing
|
||||
@endpoint KafkaexLagExporterWeb.Endpoint
|
||||
end
|
||||
end
|
||||
|
||||
setup _tags do
|
||||
:ok
|
||||
end
|
||||
end
|
||||
@@ -1,37 +0,0 @@
|
||||
defmodule KafkaexLagExporterWeb.ConnCase do
|
||||
@moduledoc """
|
||||
This module defines the test case to be used by
|
||||
tests that require setting up a connection.
|
||||
|
||||
Such tests rely on `Phoenix.ConnTest` and also
|
||||
import other functionality to make it easier
|
||||
to build common data structures and query the data layer.
|
||||
|
||||
Finally, if the test case interacts with the database,
|
||||
we enable the SQL sandbox, so changes done to the database
|
||||
are reverted at the end of every test. If you are using
|
||||
PostgreSQL, you can even run database tests asynchronously
|
||||
by setting `use KafkaexLagExporterWeb.ConnCase, async: true`, although
|
||||
this option is not recommended for other databases.
|
||||
"""
|
||||
|
||||
use ExUnit.CaseTemplate
|
||||
|
||||
using do
|
||||
quote do
|
||||
# Import conveniences for testing with connections
|
||||
import Plug.Conn
|
||||
import Phoenix.ConnTest
|
||||
import KafkaexLagExporterWeb.ConnCase
|
||||
|
||||
alias KafkaexLagExporterWeb.Router.Helpers, as: Routes
|
||||
|
||||
# The default endpoint for testing
|
||||
@endpoint KafkaexLagExporterWeb.Endpoint
|
||||
end
|
||||
end
|
||||
|
||||
setup _tags do
|
||||
{:ok, conn: Phoenix.ConnTest.build_conn()}
|
||||
end
|
||||
end
|
||||
@@ -1,2 +1 @@
|
||||
Application.ensure_all_started(:mox)
|
||||
ExUnit.start()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
defmodule KafkaexLagExporterTopicNameParserTest do
|
||||
use ExUnit.Case
|
||||
doctest KafkaexLagExporter.TopicNameParser
|
||||
use ExUnit.Case, async: true
|
||||
|
||||
test "should parse single topic" do
|
||||
test_member_assignment =
|
||||
|
||||
Reference in New Issue
Block a user