Compare commits
13 Commits
baed79e1ba
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 29c032cb16 | |||
| a34e91770f | |||
| 98e79b927d | |||
| 9e9676d4e4 | |||
| bf8389b4e0 | |||
| ee5b69f646 | |||
| 7104414bc1 | |||
| d580b0f1d0 | |||
| 0312a1e37c | |||
| c98e8da2ec | |||
| 0016cd0f74 | |||
| b098385e39 | |||
| eae84fef1f |
@@ -1,4 +1,2 @@
|
||||
elixir 1.16.2-otp-26
|
||||
erlang 26.0.2
|
||||
tilt 0.33.11
|
||||
kind 0.22.0
|
||||
|
||||
12
LICENSE
Normal file
12
LICENSE
Normal file
@@ -0,0 +1,12 @@
|
||||
ISC License
|
||||
|
||||
Copyright 2024 Pascal Schmid
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
|
||||
granted, provided that the above copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
|
||||
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
|
||||
OF THIS SOFTWARE.
|
||||
57
README.md
57
README.md
@@ -1,14 +1,43 @@
|
||||
# KafkaexLagExporter
|
||||
# KafkaExLagExporter
|
||||
|
||||
This project will collect Kafka consumer lag and provide them via Prometheus.
|
||||
|
||||
## Metrics
|
||||
|
||||
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner.
|
||||
KafkaExLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
|
||||
|
||||
**`kafka_consumergroup_group_topic_sum_lag`**
|
||||
|
||||
Labels: `cluster_name, group, topic, consumer_id, member_host`
|
||||
|
||||
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this
|
||||
topic for this group.
|
||||
|
||||
**`kafka_consumergroup_group_lag`**
|
||||
|
||||
Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
|
||||
|
||||
The difference between the last produced offset and the last consumed offset for this partition in this topic
|
||||
partition for this group.
|
||||
|
||||
## Start
|
||||
|
||||
```bash
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 lechindianer/kafkaex_lag_exporter:0.1
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
```
|
||||
|
||||
Now you can check the exposed metrics at [localhost:4000](localhost:4000).
|
||||
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
|
||||
|
||||
## Configuration
|
||||
|
||||
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
|
||||
value set `KAFKA_EX_INTERVAL_MS`, i.e.
|
||||
|
||||
```bash
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
|
||||
lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
```
|
||||
|
||||
## Developing
|
||||
|
||||
@@ -18,24 +47,38 @@ To start the project locally:
|
||||
KAFKA_BROKERS="localhost:9092" iex -S mix
|
||||
```
|
||||
|
||||
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||
KafkaexLagExporter:
|
||||
|
||||
```bash
|
||||
docker-compose up --build
|
||||
docker compose up --build
|
||||
```
|
||||
|
||||
Kowl is served at [localhost:8080](localhost:8080).
|
||||
Kowl is served at [http://localhost:8080](http://localhost:8080).
|
||||
|
||||
### Tests
|
||||
|
||||
```bash
|
||||
MIX_ENV=test mix test --no-start
|
||||
```
|
||||
|
||||
# Don't forget to check credo for code violations:
|
||||
### Code style
|
||||
|
||||
Don't forget to check [credo](https://hexdocs.pm/credo/overview.html) for code violations:
|
||||
|
||||
```bash
|
||||
mix credo
|
||||
```
|
||||
|
||||
This project also leverages the use of typespecs in order to provide static code checking:
|
||||
|
||||
```bash
|
||||
mix dialyzer
|
||||
```
|
||||
|
||||
## Links
|
||||
|
||||
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
|
||||
|
||||
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me
|
||||
creating my first real Elixir project. Thank you!
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
kafka:
|
||||
brokers:
|
||||
- kafka1:9092
|
||||
- kafka2:9092
|
||||
- kafka3:9092
|
||||
|
||||
# server:
|
||||
# listenPort: 8080
|
||||
@@ -32,7 +32,7 @@ services:
|
||||
- redpanda_network
|
||||
|
||||
console:
|
||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
||||
image: docker.redpanda.com/redpandadata/console:v2.4.6
|
||||
entrypoint: /bin/sh
|
||||
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
|
||||
environment:
|
||||
@@ -63,7 +63,6 @@ services:
|
||||
image: quay.io/cloudhut/owl-shop:latest
|
||||
networks:
|
||||
- redpanda_network
|
||||
#platform: 'linux/amd64'
|
||||
environment:
|
||||
- SHOP_KAFKA_BROKERS=redpanda:29092
|
||||
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
|
||||
@@ -78,7 +77,6 @@ services:
|
||||
container_name: connect
|
||||
networks:
|
||||
- redpanda_network
|
||||
#platform: 'linux/amd64'
|
||||
depends_on:
|
||||
- redpanda
|
||||
ports:
|
||||
@@ -109,6 +107,8 @@ services:
|
||||
- '4000:4000'
|
||||
environment:
|
||||
- KAFKA_BROKERS=redpanda:29092
|
||||
networks:
|
||||
- redpanda_network
|
||||
depends_on:
|
||||
- redpanda
|
||||
restart: "unless-stopped"
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
kind delete cluster
|
||||
|
||||
kind create cluster --config kind.yaml
|
||||
|
||||
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
|
||||
|
||||
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
|
||||
|
||||
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
|
||||
|
||||
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
|
||||
|
||||
echo "address=/kind.cluster/$LB_IP"
|
||||
24
kind.yaml
24
kind.yaml
@@ -1,24 +0,0 @@
|
||||
kind: Cluster
|
||||
apiVersion: kind.x-k8s.io/v1alpha4
|
||||
nodes:
|
||||
- role: control-plane
|
||||
kubeadmConfigPatches:
|
||||
- |
|
||||
kind: InitConfiguration
|
||||
nodeRegistration:
|
||||
kubeletExtraArgs:
|
||||
node-labels: "ingress-ready=true"
|
||||
extraPortMappings:
|
||||
- containerPort: 80
|
||||
hostPort: 80
|
||||
protocol: TCP
|
||||
- containerPort: 443
|
||||
hostPort: 443
|
||||
protocol: TCP
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
- role: worker
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
- role: worker
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
- role: worker
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
@@ -1,21 +0,0 @@
|
||||
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
|
||||
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
|
||||
k8s_yaml('./redpanda-deployment.yaml')
|
||||
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
|
||||
k8s_yaml('./redpanda-service.yaml')
|
||||
|
||||
k8s_yaml('./connect-service.yaml')
|
||||
k8s_yaml('./connect-deployment.yaml')
|
||||
|
||||
k8s_yaml('./owl-shop-deployment.yaml')
|
||||
|
||||
k8s_yaml('./console-deployment.yaml')
|
||||
k8s_yaml('./console-service.yaml')
|
||||
k8s_yaml('./console-ingress.yaml')
|
||||
|
||||
docker_build('kafka-lag-exporter', './..')
|
||||
|
||||
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
|
||||
k8s_yaml('./kafka-lag-exporter-service.yaml')
|
||||
|
||||
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)
|
||||
@@ -1,48 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.service: connect
|
||||
name: connect
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: connect
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: connect
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: CONNECT_BOOTSTRAP_SERVERS
|
||||
value: redpanda:29092
|
||||
- name: CONNECT_CONFIGURATION
|
||||
value: |
|
||||
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
||||
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
||||
group.id=connectors-cluster
|
||||
offset.storage.topic=_internal_connectors_offsets
|
||||
config.storage.topic=_internal_connectors_configs
|
||||
status.storage.topic=_internal_connectors_status
|
||||
config.storage.replication.factor=-1
|
||||
offset.storage.replication.factor=-1
|
||||
status.storage.replication.factor=-1
|
||||
offset.flush.interval.ms=1000
|
||||
producer.linger.ms=50
|
||||
producer.batch.size=131072
|
||||
- name: CONNECT_GC_LOG_ENABLED
|
||||
value: "false"
|
||||
- name: CONNECT_HEAP_OPTS
|
||||
value: -Xms512M -Xmx512M
|
||||
- name: CONNECT_LOG_LEVEL
|
||||
value: info
|
||||
image: docker.redpanda.com/redpandadata/connectors:latest
|
||||
name: connect
|
||||
ports:
|
||||
- containerPort: 8083
|
||||
hostname: connect
|
||||
restartPolicy: Always
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: connect
|
||||
name: connect
|
||||
spec:
|
||||
ports:
|
||||
- name: "8083"
|
||||
port: 8083
|
||||
targetPort: 8083
|
||||
selector:
|
||||
io.kompose.service: connect
|
||||
@@ -1,48 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.service: console
|
||||
name: console
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: console
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: console
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- -c
|
||||
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
|
||||
command:
|
||||
- /bin/sh
|
||||
env:
|
||||
- name: CONFIG_FILEPATH
|
||||
value: /tmp/config.yml
|
||||
- name: CONSOLE_CONFIG_FILE
|
||||
value: |
|
||||
kafka:
|
||||
brokers: ["redpanda:29092"]
|
||||
schemaRegistry:
|
||||
enabled: true
|
||||
urls: ["http://redpanda:8081"]
|
||||
redpanda:
|
||||
adminApi:
|
||||
enabled: true
|
||||
urls: ["http://redpanda:9644"]
|
||||
connect:
|
||||
enabled: true
|
||||
clusters:
|
||||
- name: local-connect-cluster
|
||||
url: http://connect:8083
|
||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
||||
name: console
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
restartPolicy: Always
|
||||
@@ -1,17 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: ingress-myserviceb
|
||||
spec:
|
||||
rules:
|
||||
- host: console.lechindianer.hack
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: console
|
||||
port:
|
||||
number: 8080
|
||||
ingressClassName: nginx
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: console
|
||||
name: console
|
||||
spec:
|
||||
ports:
|
||||
- name: "8080"
|
||||
port: 8080
|
||||
targetPort: 8080
|
||||
selector:
|
||||
io.kompose.service: console
|
||||
@@ -1,26 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: KAFKA_BROKERS
|
||||
value: redpanda:29092
|
||||
image: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
ports:
|
||||
- containerPort: 4000
|
||||
restartPolicy: Always
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
spec:
|
||||
ports:
|
||||
- name: "4000"
|
||||
port: 4000
|
||||
targetPort: 4000
|
||||
selector:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
@@ -1,31 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: owl-shop
|
||||
name: owl-shop
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: owl-shop
|
||||
template:
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: owl-shop
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: SHOP_KAFKA_BROKERS
|
||||
value: redpanda:29092
|
||||
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
|
||||
value: "1"
|
||||
- name: SHOP_TRAFFIC_INTERVAL_DURATION
|
||||
value: 0.1s
|
||||
- name: SHOP_TRAFFIC_INTERVAL_RATE
|
||||
value: "6"
|
||||
image: quay.io/cloudhut/owl-shop:latest
|
||||
name: owl-shop
|
||||
restartPolicy: Always
|
||||
@@ -1,44 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: redpanda
|
||||
strategy:
|
||||
type: Recreate
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: redpanda
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- redpanda start
|
||||
- --smp 1
|
||||
- --overprovisioned
|
||||
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
|
||||
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
|
||||
- --pandaproxy-addr 0.0.0.0:8082
|
||||
- --advertise-pandaproxy-addr localhost:8082
|
||||
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
|
||||
name: redpanda
|
||||
ports:
|
||||
- containerPort: 8081
|
||||
- containerPort: 8082
|
||||
- containerPort: 9092
|
||||
- containerPort: 9644
|
||||
- containerPort: 29092
|
||||
volumeMounts:
|
||||
- mountPath: /var/lib/redpanda/data
|
||||
name: redpanda
|
||||
restartPolicy: Always
|
||||
volumes:
|
||||
- name: redpanda
|
||||
persistentVolumeClaim:
|
||||
claimName: redpanda
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: NetworkPolicy
|
||||
metadata:
|
||||
name: redpanda-owl-shop-default
|
||||
spec:
|
||||
ingress:
|
||||
- from:
|
||||
- podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: NetworkPolicy
|
||||
metadata:
|
||||
name: redpanda-owl-shop-redpanda-network
|
||||
spec:
|
||||
ingress:
|
||||
- from:
|
||||
- podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
@@ -1,12 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: 100Mi
|
||||
@@ -1,25 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
ports:
|
||||
- name: "8081"
|
||||
port: 8081
|
||||
targetPort: 8081
|
||||
- name: "8082"
|
||||
port: 8082
|
||||
targetPort: 8082
|
||||
- name: "9092"
|
||||
port: 9092
|
||||
targetPort: 9092
|
||||
- name: "9644"
|
||||
port: 9644
|
||||
targetPort: 9644
|
||||
- name: "29092"
|
||||
port: 29092
|
||||
targetPort: 29092
|
||||
selector:
|
||||
io.kompose.service: redpanda
|
||||
@@ -3,16 +3,6 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
|
||||
|
||||
@callback connection(atom) :: {list({charlist, non_neg_integer}), Keyword.t()}
|
||||
|
||||
@callback resolve_offsets(binary, :earliest | :latest, atom) ::
|
||||
list({non_neg_integer, integer})
|
||||
|
||||
@callback fetch_committed_offsets(binary, binary, atom) ::
|
||||
list({non_neg_integer, non_neg_integer})
|
||||
|
||||
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
|
||||
|
||||
@callback lag_total(binary, binary, atom) :: non_neg_integer
|
||||
|
||||
@callback get_consumer_group_names({host :: atom, port :: non_neg_integer}) :: list(binary)
|
||||
|
||||
@callback get_consumer_group_info(
|
||||
@@ -21,23 +11,20 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
|
||||
list(binary)
|
||||
) :: list({consumer_group :: binary, topics :: list(binary)})
|
||||
|
||||
@callback lag(binary, binary, atom) :: list({non_neg_integer, integer})
|
||||
|
||||
def connection(client), do: impl().connection(client)
|
||||
|
||||
def resolve_offsets(topic, type, client), do: impl().resolve_offsets(topic, type, client)
|
||||
|
||||
def fetch_committed_offsets(topic, consumer_group, client),
|
||||
do: impl().fetch_committed_offsets(topic, consumer_group, client)
|
||||
|
||||
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
|
||||
|
||||
def lag_total(topic, consumer_group, client),
|
||||
do: impl().lag_total(topic, consumer_group, client)
|
||||
|
||||
def get_consumer_group_names({host, port}), do: impl().get_consumer_group_names({host, port})
|
||||
def get_consumer_group_names(endpoint), do: impl().get_consumer_group_names(endpoint)
|
||||
|
||||
def get_consumer_group_info(endpoint, list, consumer_group_names),
|
||||
do: impl().get_consumer_group_info(endpoint, list, consumer_group_names)
|
||||
|
||||
def lag(topic, consumer_group, client), do: impl().lag(topic, consumer_group, client)
|
||||
|
||||
def fetch_committed_offsets(topic, consumer_group, client),
|
||||
do: impl().fetch_committed_offsets(topic, consumer_group, client)
|
||||
|
||||
defp impl,
|
||||
do: Application.get_env(:kafkaex_lag_exporter, :kafka_utils, KafkaexLagExporter.KafkaUtils)
|
||||
end
|
||||
|
||||
@@ -4,10 +4,9 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
||||
alias KafkaexLagExporter.ConsumerOffset
|
||||
alias KafkaexLagExporter.KafkaUtils
|
||||
|
||||
# TODO fix type
|
||||
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
|
||||
lags: list(binary),
|
||||
sum: list(binary)
|
||||
lags: list(ConsumerOffset.t()),
|
||||
sum: list(ConsumerOffset.t())
|
||||
}
|
||||
def get(endpoint) do
|
||||
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
|
||||
|
||||
@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
||||
|
||||
require Logger
|
||||
|
||||
@interval 5_000
|
||||
|
||||
def start_link(default) when is_list(default) do
|
||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||
end
|
||||
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||
|
||||
interval =
|
||||
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|
||||
|> String.to_integer()
|
||||
|
||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||
Logger.info("Updating lag information every #{interval} milliseconds")
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
Process.send_after(self(), :tick, interval)
|
||||
|
||||
{:ok, %{endpoints: endpoints}}
|
||||
{:ok, %{endpoints: endpoints, interval: interval}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
[endpoint | _] = state.endpoints
|
||||
interval = state.interval
|
||||
|
||||
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
|
||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
Process.send_after(self(), :tick, interval)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
|
||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||
|
||||
require Logger
|
||||
|
||||
@default_client :client1
|
||||
|
||||
def connection, do: connection(@default_client)
|
||||
@@ -32,54 +30,8 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def resolve_offsets(topic, type, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||
|
||||
for i <- Range.new(0, partitions_count - 1),
|
||||
{:ok, offset} =
|
||||
KafkaWrapper.resolve_offset(endpoints, topic, i, type, sock_opts) do
|
||||
{i, offset}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def fetch_committed_offsets(_topic, consumer_group, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
for r <- response,
|
||||
pr <- r[:partitions],
|
||||
do: {pr[:partition_index], pr[:committed_offset]}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def lag(topic, consumer_group, client) do
|
||||
offsets =
|
||||
resolve_offsets(topic, :latest, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
committed_offsets =
|
||||
fetch_committed_offsets(topic, consumer_group, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||
{part, current - committed}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def lag_total(topic, consumer_group, client) do
|
||||
for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
|
||||
acc -> acc + recs
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def get_consumer_group_names({host, port}) do
|
||||
[{_, groups} | _] = KafkaWrapper.list_all_groups([{host, port}], [])
|
||||
def get_consumer_group_names(endpoint) do
|
||||
[{_, groups} | _] = KafkaWrapper.list_all_groups([endpoint], [])
|
||||
|
||||
groups
|
||||
|> Enum.filter(fn {_, _, protocol} -> protocol === "consumer" end)
|
||||
@@ -92,25 +44,69 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
|
||||
group_descriptions
|
||||
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
||||
get_member_info(members)
|
||||
|> Enum.map(fn {topic_names, consumer_id, member_host} ->
|
||||
{consumer_group, topic_names, consumer_id, member_host}
|
||||
end)
|
||||
get_member_info(consumer_group, members)
|
||||
end)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def lag(topic, consumer_group, client) do
|
||||
offsets =
|
||||
resolve_offsets(topic, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
committed_offsets =
|
||||
fetch_committed_offsets(topic, consumer_group, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||
{part, current - committed}
|
||||
end
|
||||
end
|
||||
|
||||
@spec resolve_offsets(binary, atom) :: list({non_neg_integer, integer})
|
||||
defp resolve_offsets(topic, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||
|
||||
for i <- Range.new(0, partitions_count - 1),
|
||||
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
||||
{i, offset}
|
||||
end
|
||||
end
|
||||
|
||||
@spec fetch_committed_offsets(binary, binary, atom) ::
|
||||
list({non_neg_integer, non_neg_integer})
|
||||
defp fetch_committed_offsets(_topic, consumer_group, client) do
|
||||
{endpoints, sock_opts} = connection(client)
|
||||
|
||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
for r <- response, pr <- r[:partitions] do
|
||||
{pr[:partition_index], pr[:committed_offset]}
|
||||
end
|
||||
end
|
||||
|
||||
@spec get_member_info(
|
||||
binary,
|
||||
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
||||
) ::
|
||||
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
|
||||
defp get_member_info(members) do
|
||||
Enum.map(members, fn %{
|
||||
list(
|
||||
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|
||||
member_host :: binary}
|
||||
)
|
||||
defp get_member_info(consumer_group, members) do
|
||||
members
|
||||
|> Enum.map(fn %{
|
||||
client_host: member_host,
|
||||
member_assignment: member_assignment,
|
||||
member_id: consumer_id
|
||||
} ->
|
||||
topic_names = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||
{topic_names, consumer_id, member_host}
|
||||
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||
{topics, consumer_id, member_host}
|
||||
end)
|
||||
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
||||
{consumer_group, topics, consumer_id, member_host}
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -36,9 +36,13 @@ defmodule KafkaexLagExporter.Metrics do
|
||||
)
|
||||
end
|
||||
|
||||
@spec group_sum_lag(
|
||||
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
|
||||
list(ConsumerOffset.t())
|
||||
) :: :ok
|
||||
@doc false
|
||||
def group_sum_lag({host, _port}, cunsumer_offsets) do
|
||||
Enum.each(cunsumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||
def group_sum_lag({host, _port}, consumer_offsets) do
|
||||
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||
lag = elem(consumer_offset.lag, 1)
|
||||
|
||||
:telemetry.execute(
|
||||
@@ -55,6 +59,10 @@ defmodule KafkaexLagExporter.Metrics do
|
||||
end)
|
||||
end
|
||||
|
||||
@spec group_lag_per_partition(
|
||||
KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint(),
|
||||
list(ConsumerOffset.t())
|
||||
) :: :ok
|
||||
@doc false
|
||||
def group_lag_per_partition({host, _port}, consumer_offsets) do
|
||||
Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
|
||||
|
||||
@@ -40,9 +40,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
|
||||
end
|
||||
|
||||
test "should return the calculated lags" do
|
||||
test_endpoint = {"test endpoint", 666}
|
||||
|
||||
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
|
||||
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
|
||||
|
||||
assert sum == [
|
||||
%ConsumerOffset{
|
||||
|
||||
@@ -5,9 +5,11 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
|
||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||
alias KafkaexLagExporter.KafkaUtils
|
||||
|
||||
@test_endpoint "test_host:1234"
|
||||
@test_consumer_group "test-consumer-group"
|
||||
@test_endpoint {"test_host", "1234"}
|
||||
@test_sock_opts [ssl: []]
|
||||
@test_group_name1 "test-consumer_group1"
|
||||
@test_group_name2 "test-consumer_group"
|
||||
@test_topic_name "test-topic_name"
|
||||
|
||||
setup do
|
||||
patch(
|
||||
@@ -19,86 +21,175 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
|
||||
:ok
|
||||
end
|
||||
|
||||
describe "resolve_offsets" do
|
||||
describe "get_consumer_group_names" do
|
||||
setup do
|
||||
client = :client2
|
||||
topic = "test-topic"
|
||||
consumer_info1 = {:a, @test_group_name1, "consumer"}
|
||||
consumer_info2 = {:b, @test_group_name2, "something-other"}
|
||||
|
||||
patch(
|
||||
KafkaWrapper,
|
||||
:get_partitions_count,
|
||||
fn _, _ -> {:ok, 3} end
|
||||
patch(KafkaWrapper, :list_all_groups, fn _, _ ->
|
||||
[{@test_endpoint, [consumer_info1, consumer_info2]}]
|
||||
end)
|
||||
|
||||
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
|
||||
|
||||
[group_names: group_names]
|
||||
end
|
||||
|
||||
test "should list all groups" do
|
||||
assert_called(KafkaWrapper.list_all_groups([@test_endpoint], []))
|
||||
end
|
||||
|
||||
test "should filter for consumer groups and return names", context do
|
||||
assert context[:group_names] === [@test_group_name1]
|
||||
end
|
||||
end
|
||||
|
||||
describe "get_consumer_group_info" do
|
||||
setup do
|
||||
consumer_group_name = "test-group-id1"
|
||||
member_assignment = "test_member_assignment"
|
||||
member_assignment = "test_member_assignment"
|
||||
consumer_id = "test_consumer_id"
|
||||
member_host = "test_member_host"
|
||||
|
||||
patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
|
||||
{
|
||||
:ok,
|
||||
[
|
||||
%{
|
||||
group_id: consumer_group_name,
|
||||
members: [
|
||||
%{
|
||||
client_host: member_host,
|
||||
member_assignment: member_assignment,
|
||||
member_id: consumer_id
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
end)
|
||||
|
||||
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
|
||||
|
||||
group_info =
|
||||
KafkaUtils.get_consumer_group_info(@test_endpoint, [], [
|
||||
@test_group_name1,
|
||||
@test_group_name2
|
||||
])
|
||||
|
||||
[
|
||||
member_host: member_host,
|
||||
group_info: group_info,
|
||||
consumer_group_name: consumer_group_name,
|
||||
member_assignment: member_assignment,
|
||||
consumer_id: consumer_id
|
||||
]
|
||||
end
|
||||
|
||||
test "should describe groups" do
|
||||
assert_called(
|
||||
KafkaWrapper.describe_groups(@test_endpoint, [], [@test_group_name1, @test_group_name2])
|
||||
)
|
||||
end
|
||||
|
||||
patch(
|
||||
KafkaWrapper,
|
||||
:resolve_offset,
|
||||
fn _, _, _, _, _ -> {:ok, 42} end
|
||||
test "should parse topic names", context do
|
||||
expected_member_assignment = context[:expected_member_assignment]
|
||||
|
||||
assert_called(
|
||||
KafkaexLagExporter.TopicNameParser.parse_topic_names(expected_member_assignment)
|
||||
)
|
||||
end
|
||||
|
||||
offsets = KafkaUtils.resolve_offsets(topic, :earliest, client)
|
||||
test "should filter for consumer groups and return names", context do
|
||||
expected_consumer_group_name = context[:consumer_group_name]
|
||||
expected_consumer_id = context[:consumer_id]
|
||||
expected_member_host = context[:member_host]
|
||||
|
||||
[client: client, offsets: offsets, topic: topic]
|
||||
assert context[:group_info] === [
|
||||
{
|
||||
expected_consumer_group_name,
|
||||
[@test_topic_name],
|
||||
expected_consumer_id,
|
||||
expected_member_host
|
||||
}
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
describe "lag" do
|
||||
setup do
|
||||
consumer_group = "test-consumer-group"
|
||||
client = :client1
|
||||
committed_offset1 = 23
|
||||
committed_offset2 = 42
|
||||
resolved_offset = 50
|
||||
|
||||
partition_info1 = %{partition_index: 0, committed_offset: committed_offset1}
|
||||
partition_info2 = %{partition_index: 1, committed_offset: committed_offset2}
|
||||
|
||||
patch(KafkaWrapper, :get_partitions_count, fn _, _ -> {:ok, 2} end)
|
||||
|
||||
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
|
||||
|
||||
patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
|
||||
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
|
||||
end)
|
||||
|
||||
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)
|
||||
|
||||
[
|
||||
lag: lag,
|
||||
consumer_group: consumer_group,
|
||||
client: client,
|
||||
committed_offset1: committed_offset1,
|
||||
committed_offset2: committed_offset2,
|
||||
resolved_offset: resolved_offset
|
||||
]
|
||||
end
|
||||
|
||||
test "should start connection", context do
|
||||
expected_client = context[:client]
|
||||
|
||||
assert_called(KafkaUtils.connection(expected_client))
|
||||
end
|
||||
|
||||
test "should get partition count", context do
|
||||
expected_topic = context[:topic]
|
||||
expected_client = context[:client]
|
||||
|
||||
assert_called(KafkaWrapper.get_partitions_count(expected_client, expected_topic))
|
||||
assert_called(KafkaWrapper.get_partitions_count(expected_client, @test_topic_name))
|
||||
end
|
||||
|
||||
test "should resolve offset per partition", context do
|
||||
expected_topic = context[:topic]
|
||||
|
||||
test "should resolve offsets for each partition" do
|
||||
assert_called(
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 0, :earliest, @test_sock_opts)
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 0, :latest, @test_sock_opts)
|
||||
)
|
||||
|
||||
assert_called(
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 1, :earliest, @test_sock_opts)
|
||||
)
|
||||
|
||||
assert_called(
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, expected_topic, 2, :earliest, @test_sock_opts)
|
||||
KafkaWrapper.resolve_offset(@test_endpoint, @test_topic_name, 1, :latest, @test_sock_opts)
|
||||
)
|
||||
end
|
||||
|
||||
test "should return offsets", context do
|
||||
assert context[:offsets] == [{0, 42}, {1, 42}, {2, 42}]
|
||||
end
|
||||
end
|
||||
test "should fetch committed offsets", context do
|
||||
expected_consumer_group = context[:consumer_group]
|
||||
|
||||
describe "fetch_committed_offsets" do
|
||||
setup do
|
||||
partition_info1 = %{partition_index: 0, committed_offset: 23}
|
||||
partition_info2 = %{partition_index: 1, committed_offset: 42}
|
||||
|
||||
patch(
|
||||
KafkaWrapper,
|
||||
:fetch_committed_offsets,
|
||||
fn _, _, _ ->
|
||||
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
|
||||
end
|
||||
)
|
||||
|
||||
offsets = KafkaUtils.fetch_committed_offsets("test-topic", @test_consumer_group, :test_atom)
|
||||
|
||||
[offsets: offsets]
|
||||
end
|
||||
|
||||
test "should get the committed offsets from KafkaWrapper" do
|
||||
assert_called(
|
||||
KafkaWrapper.fetch_committed_offsets(
|
||||
@test_endpoint,
|
||||
@test_sock_opts,
|
||||
@test_consumer_group
|
||||
expected_consumer_group
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
test "should return offsets", context do
|
||||
assert context[:offsets] == [{0, 23}, {1, 42}]
|
||||
test "should return calculated lag per partition", context do
|
||||
expected_lag_partition_1 = context[:resolved_offset] - context[:committed_offset1]
|
||||
expected_lag_partition_2 = context[:resolved_offset] - context[:committed_offset2]
|
||||
|
||||
assert context[:lag] === [
|
||||
{0, expected_lag_partition_1},
|
||||
{1, expected_lag_partition_2}
|
||||
]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user