Compare commits
10 Commits
0016cd0f74
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 29c032cb16 | |||
| a34e91770f | |||
| 98e79b927d | |||
| 9e9676d4e4 | |||
| bf8389b4e0 | |||
| ee5b69f646 | |||
| 7104414bc1 | |||
| d580b0f1d0 | |||
| 0312a1e37c | |||
| c98e8da2ec |
@@ -1,4 +1,2 @@
|
|||||||
elixir 1.16.2-otp-26
|
elixir 1.16.2-otp-26
|
||||||
erlang 26.0.2
|
erlang 26.0.2
|
||||||
tilt 0.33.11
|
|
||||||
kind 0.22.0
|
|
||||||
|
|||||||
12
LICENSE
Normal file
12
LICENSE
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
ISC License
|
||||||
|
|
||||||
|
Copyright 2024 Pascal Schmid
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
|
||||||
|
granted, provided that the above copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
||||||
|
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
|
||||||
|
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
|
||||||
|
OF THIS SOFTWARE.
|
||||||
34
README.md
34
README.md
@@ -1,30 +1,43 @@
|
|||||||
# KafkaexLagExporter
|
# KafkaExLagExporter
|
||||||
|
|
||||||
This project will collect Kafka consumer lag and provide them via Prometheus.
|
This project will collect Kafka consumer lag and provide them via Prometheus.
|
||||||
|
|
||||||
## Metrics
|
## Metrics
|
||||||
|
|
||||||
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner. KafkaexLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
|
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner.
|
||||||
|
KafkaExLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
|
||||||
|
|
||||||
**`kafka_consumergroup_group_topic_sum_lag`**
|
**`kafka_consumergroup_group_topic_sum_lag`**
|
||||||
|
|
||||||
Labels: `cluster_name, group, topic, consumer_id, member_host`
|
Labels: `cluster_name, group, topic, consumer_id, member_host`
|
||||||
|
|
||||||
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group.
|
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this
|
||||||
|
topic for this group.
|
||||||
|
|
||||||
**`kafka_consumergroup_group_lag`**
|
**`kafka_consumergroup_group_lag`**
|
||||||
|
|
||||||
Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
|
Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
|
||||||
|
|
||||||
The difference between the last produced offset and the last consumed offset for this partition in this topic partition for this group.
|
The difference between the last produced offset and the last consumed offset for this partition in this topic
|
||||||
|
partition for this group.
|
||||||
|
|
||||||
## Start
|
## Start
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 lechindianer/kafkaex_lag_exporter:0.2.0
|
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
|
||||||
```
|
```
|
||||||
|
|
||||||
Now you can check the exposed metrics at [localhost:4000](localhost:4000).
|
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
|
||||||
|
value set `KAFKA_EX_INTERVAL_MS`, i.e.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
|
||||||
|
lechindianer/kafkaex_lag_exporter:0.2.0
|
||||||
|
```
|
||||||
|
|
||||||
## Developing
|
## Developing
|
||||||
|
|
||||||
@@ -34,14 +47,14 @@ To start the project locally:
|
|||||||
KAFKA_BROKERS="localhost:9092" iex -S mix
|
KAFKA_BROKERS="localhost:9092" iex -S mix
|
||||||
```
|
```
|
||||||
|
|
||||||
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||||
KafkaexLagExporter:
|
KafkaexLagExporter:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker-compose up --build
|
docker compose up --build
|
||||||
```
|
```
|
||||||
|
|
||||||
Kowl is served at [localhost:8080](localhost:8080).
|
Kowl is served at [http://localhost:8080](http://localhost:8080).
|
||||||
|
|
||||||
### Tests
|
### Tests
|
||||||
|
|
||||||
@@ -67,4 +80,5 @@ mix dialyzer
|
|||||||
|
|
||||||
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
|
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
|
||||||
|
|
||||||
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me creating my first real Elixir project. Thank you!
|
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me
|
||||||
|
creating my first real Elixir project. Thank you!
|
||||||
|
|||||||
@@ -1,8 +0,0 @@
|
|||||||
kafka:
|
|
||||||
brokers:
|
|
||||||
- kafka1:9092
|
|
||||||
- kafka2:9092
|
|
||||||
- kafka3:9092
|
|
||||||
|
|
||||||
# server:
|
|
||||||
# listenPort: 8080
|
|
||||||
@@ -32,7 +32,7 @@ services:
|
|||||||
- redpanda_network
|
- redpanda_network
|
||||||
|
|
||||||
console:
|
console:
|
||||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
image: docker.redpanda.com/redpandadata/console:v2.4.6
|
||||||
entrypoint: /bin/sh
|
entrypoint: /bin/sh
|
||||||
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
|
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
|
||||||
environment:
|
environment:
|
||||||
@@ -63,7 +63,6 @@ services:
|
|||||||
image: quay.io/cloudhut/owl-shop:latest
|
image: quay.io/cloudhut/owl-shop:latest
|
||||||
networks:
|
networks:
|
||||||
- redpanda_network
|
- redpanda_network
|
||||||
#platform: 'linux/amd64'
|
|
||||||
environment:
|
environment:
|
||||||
- SHOP_KAFKA_BROKERS=redpanda:29092
|
- SHOP_KAFKA_BROKERS=redpanda:29092
|
||||||
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
|
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
|
||||||
@@ -78,7 +77,6 @@ services:
|
|||||||
container_name: connect
|
container_name: connect
|
||||||
networks:
|
networks:
|
||||||
- redpanda_network
|
- redpanda_network
|
||||||
#platform: 'linux/amd64'
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- redpanda
|
- redpanda
|
||||||
ports:
|
ports:
|
||||||
@@ -109,6 +107,8 @@ services:
|
|||||||
- '4000:4000'
|
- '4000:4000'
|
||||||
environment:
|
environment:
|
||||||
- KAFKA_BROKERS=redpanda:29092
|
- KAFKA_BROKERS=redpanda:29092
|
||||||
|
networks:
|
||||||
|
- redpanda_network
|
||||||
depends_on:
|
depends_on:
|
||||||
- redpanda
|
- redpanda
|
||||||
restart: "unless-stopped"
|
restart: "unless-stopped"
|
||||||
|
|||||||
@@ -1,15 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
kind delete cluster
|
|
||||||
|
|
||||||
kind create cluster --config kind.yaml
|
|
||||||
|
|
||||||
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
|
|
||||||
|
|
||||||
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
|
|
||||||
|
|
||||||
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
|
|
||||||
|
|
||||||
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
|
|
||||||
|
|
||||||
echo "address=/kind.cluster/$LB_IP"
|
|
||||||
24
kind.yaml
24
kind.yaml
@@ -1,24 +0,0 @@
|
|||||||
kind: Cluster
|
|
||||||
apiVersion: kind.x-k8s.io/v1alpha4
|
|
||||||
nodes:
|
|
||||||
- role: control-plane
|
|
||||||
kubeadmConfigPatches:
|
|
||||||
- |
|
|
||||||
kind: InitConfiguration
|
|
||||||
nodeRegistration:
|
|
||||||
kubeletExtraArgs:
|
|
||||||
node-labels: "ingress-ready=true"
|
|
||||||
extraPortMappings:
|
|
||||||
- containerPort: 80
|
|
||||||
hostPort: 80
|
|
||||||
protocol: TCP
|
|
||||||
- containerPort: 443
|
|
||||||
hostPort: 443
|
|
||||||
protocol: TCP
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
- role: worker
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
- role: worker
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
- role: worker
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
|
|
||||||
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
|
|
||||||
k8s_yaml('./redpanda-deployment.yaml')
|
|
||||||
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
|
|
||||||
k8s_yaml('./redpanda-service.yaml')
|
|
||||||
|
|
||||||
k8s_yaml('./connect-service.yaml')
|
|
||||||
k8s_yaml('./connect-deployment.yaml')
|
|
||||||
|
|
||||||
k8s_yaml('./owl-shop-deployment.yaml')
|
|
||||||
|
|
||||||
k8s_yaml('./console-deployment.yaml')
|
|
||||||
k8s_yaml('./console-service.yaml')
|
|
||||||
k8s_yaml('./console-ingress.yaml')
|
|
||||||
|
|
||||||
docker_build('kafka-lag-exporter', './..')
|
|
||||||
|
|
||||||
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
|
|
||||||
k8s_yaml('./kafka-lag-exporter-service.yaml')
|
|
||||||
|
|
||||||
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)
|
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
creationTimestamp: null
|
|
||||||
labels:
|
|
||||||
io.kompose.service: connect
|
|
||||||
name: connect
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: connect
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: connect
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- env:
|
|
||||||
- name: CONNECT_BOOTSTRAP_SERVERS
|
|
||||||
value: redpanda:29092
|
|
||||||
- name: CONNECT_CONFIGURATION
|
|
||||||
value: |
|
|
||||||
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
|
||||||
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
|
||||||
group.id=connectors-cluster
|
|
||||||
offset.storage.topic=_internal_connectors_offsets
|
|
||||||
config.storage.topic=_internal_connectors_configs
|
|
||||||
status.storage.topic=_internal_connectors_status
|
|
||||||
config.storage.replication.factor=-1
|
|
||||||
offset.storage.replication.factor=-1
|
|
||||||
status.storage.replication.factor=-1
|
|
||||||
offset.flush.interval.ms=1000
|
|
||||||
producer.linger.ms=50
|
|
||||||
producer.batch.size=131072
|
|
||||||
- name: CONNECT_GC_LOG_ENABLED
|
|
||||||
value: "false"
|
|
||||||
- name: CONNECT_HEAP_OPTS
|
|
||||||
value: -Xms512M -Xmx512M
|
|
||||||
- name: CONNECT_LOG_LEVEL
|
|
||||||
value: info
|
|
||||||
image: docker.redpanda.com/redpandadata/connectors:latest
|
|
||||||
name: connect
|
|
||||||
ports:
|
|
||||||
- containerPort: 8083
|
|
||||||
hostname: connect
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: connect
|
|
||||||
name: connect
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "8083"
|
|
||||||
port: 8083
|
|
||||||
targetPort: 8083
|
|
||||||
selector:
|
|
||||||
io.kompose.service: connect
|
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
creationTimestamp: null
|
|
||||||
labels:
|
|
||||||
io.kompose.service: console
|
|
||||||
name: console
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: console
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: console
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- args:
|
|
||||||
- -c
|
|
||||||
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
|
|
||||||
command:
|
|
||||||
- /bin/sh
|
|
||||||
env:
|
|
||||||
- name: CONFIG_FILEPATH
|
|
||||||
value: /tmp/config.yml
|
|
||||||
- name: CONSOLE_CONFIG_FILE
|
|
||||||
value: |
|
|
||||||
kafka:
|
|
||||||
brokers: ["redpanda:29092"]
|
|
||||||
schemaRegistry:
|
|
||||||
enabled: true
|
|
||||||
urls: ["http://redpanda:8081"]
|
|
||||||
redpanda:
|
|
||||||
adminApi:
|
|
||||||
enabled: true
|
|
||||||
urls: ["http://redpanda:9644"]
|
|
||||||
connect:
|
|
||||||
enabled: true
|
|
||||||
clusters:
|
|
||||||
- name: local-connect-cluster
|
|
||||||
url: http://connect:8083
|
|
||||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
|
||||||
name: console
|
|
||||||
ports:
|
|
||||||
- containerPort: 8080
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,17 +0,0 @@
|
|||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: Ingress
|
|
||||||
metadata:
|
|
||||||
name: ingress-myserviceb
|
|
||||||
spec:
|
|
||||||
rules:
|
|
||||||
- host: console.lechindianer.hack
|
|
||||||
http:
|
|
||||||
paths:
|
|
||||||
- path: /
|
|
||||||
pathType: Prefix
|
|
||||||
backend:
|
|
||||||
service:
|
|
||||||
name: console
|
|
||||||
port:
|
|
||||||
number: 8080
|
|
||||||
ingressClassName: nginx
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: console
|
|
||||||
name: console
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "8080"
|
|
||||||
port: 8080
|
|
||||||
targetPort: 8080
|
|
||||||
selector:
|
|
||||||
io.kompose.service: console
|
|
||||||
@@ -1,26 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
name: kafka-lag-exporter
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- env:
|
|
||||||
- name: KAFKA_BROKERS
|
|
||||||
value: redpanda:29092
|
|
||||||
image: kafka-lag-exporter
|
|
||||||
name: kafka-lag-exporter
|
|
||||||
ports:
|
|
||||||
- containerPort: 4000
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
name: kafka-lag-exporter
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "4000"
|
|
||||||
port: 4000
|
|
||||||
targetPort: 4000
|
|
||||||
selector:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
@@ -1,31 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: owl-shop
|
|
||||||
name: owl-shop
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: owl-shop
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
creationTimestamp: null
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: owl-shop
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- env:
|
|
||||||
- name: SHOP_KAFKA_BROKERS
|
|
||||||
value: redpanda:29092
|
|
||||||
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
|
|
||||||
value: "1"
|
|
||||||
- name: SHOP_TRAFFIC_INTERVAL_DURATION
|
|
||||||
value: 0.1s
|
|
||||||
- name: SHOP_TRAFFIC_INTERVAL_RATE
|
|
||||||
value: "6"
|
|
||||||
image: quay.io/cloudhut/owl-shop:latest
|
|
||||||
name: owl-shop
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
name: redpanda
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
strategy:
|
|
||||||
type: Recreate
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- args:
|
|
||||||
- redpanda start
|
|
||||||
- --smp 1
|
|
||||||
- --overprovisioned
|
|
||||||
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
|
|
||||||
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
|
|
||||||
- --pandaproxy-addr 0.0.0.0:8082
|
|
||||||
- --advertise-pandaproxy-addr localhost:8082
|
|
||||||
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
|
|
||||||
name: redpanda
|
|
||||||
ports:
|
|
||||||
- containerPort: 8081
|
|
||||||
- containerPort: 8082
|
|
||||||
- containerPort: 9092
|
|
||||||
- containerPort: 9644
|
|
||||||
- containerPort: 29092
|
|
||||||
volumeMounts:
|
|
||||||
- mountPath: /var/lib/redpanda/data
|
|
||||||
name: redpanda
|
|
||||||
restartPolicy: Always
|
|
||||||
volumes:
|
|
||||||
- name: redpanda
|
|
||||||
persistentVolumeClaim:
|
|
||||||
claimName: redpanda
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: NetworkPolicy
|
|
||||||
metadata:
|
|
||||||
name: redpanda-owl-shop-default
|
|
||||||
spec:
|
|
||||||
ingress:
|
|
||||||
- from:
|
|
||||||
- podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
|
||||||
podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: NetworkPolicy
|
|
||||||
metadata:
|
|
||||||
name: redpanda-owl-shop-redpanda-network
|
|
||||||
spec:
|
|
||||||
ingress:
|
|
||||||
- from:
|
|
||||||
- podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: PersistentVolumeClaim
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
name: redpanda
|
|
||||||
spec:
|
|
||||||
accessModes:
|
|
||||||
- ReadWriteOnce
|
|
||||||
resources:
|
|
||||||
requests:
|
|
||||||
storage: 100Mi
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
name: redpanda
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "8081"
|
|
||||||
port: 8081
|
|
||||||
targetPort: 8081
|
|
||||||
- name: "8082"
|
|
||||||
port: 8082
|
|
||||||
targetPort: 8082
|
|
||||||
- name: "9092"
|
|
||||||
port: 9092
|
|
||||||
targetPort: 9092
|
|
||||||
- name: "9644"
|
|
||||||
port: 9644
|
|
||||||
targetPort: 9644
|
|
||||||
- name: "29092"
|
|
||||||
port: 29092
|
|
||||||
targetPort: 29092
|
|
||||||
selector:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
@interval 5_000
|
|
||||||
|
|
||||||
def start_link(default) when is_list(default) do
|
def start_link(default) when is_list(default) do
|
||||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||||
end
|
end
|
||||||
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
clients = Application.get_env(:brod, :clients)
|
clients = Application.get_env(:brod, :clients)
|
||||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||||
|
|
||||||
|
interval =
|
||||||
|
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|
||||||
|
|> String.to_integer()
|
||||||
|
|
||||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||||
|
Logger.info("Updating lag information every #{interval} milliseconds")
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, interval)
|
||||||
|
|
||||||
{:ok, %{endpoints: endpoints}}
|
{:ok, %{endpoints: endpoints, interval: interval}}
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
[endpoint | _] = state.endpoints
|
[endpoint | _] = state.endpoints
|
||||||
|
interval = state.interval
|
||||||
|
|
||||||
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||||
|
|
||||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, interval)
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
|
|
||||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||||
|
|
||||||
require Logger
|
|
||||||
|
|
||||||
@default_client :client1
|
@default_client :client1
|
||||||
|
|
||||||
def connection, do: connection(@default_client)
|
def connection, do: connection(@default_client)
|
||||||
@@ -46,10 +44,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
|
|
||||||
group_descriptions
|
group_descriptions
|
||||||
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
||||||
get_member_info(members)
|
get_member_info(consumer_group, members)
|
||||||
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
|
||||||
{consumer_group, topics, consumer_id, member_host}
|
|
||||||
end)
|
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -63,7 +58,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
fetch_committed_offsets(topic, consumer_group, client)
|
fetch_committed_offsets(topic, consumer_group, client)
|
||||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||||
|
|
||||||
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
|
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||||
{part, current - committed}
|
{part, current - committed}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -75,8 +70,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||||
|
|
||||||
for i <- Range.new(0, partitions_count - 1),
|
for i <- Range.new(0, partitions_count - 1),
|
||||||
{:ok, offset} =
|
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
||||||
KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
|
||||||
{i, offset}
|
{i, offset}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -88,23 +82,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
|
|
||||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||||
|
|
||||||
for r <- response,
|
for r <- response, pr <- r[:partitions] do
|
||||||
pr <- r[:partitions],
|
{pr[:partition_index], pr[:committed_offset]}
|
||||||
do: {pr[:partition_index], pr[:committed_offset]}
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec get_member_info(
|
@spec get_member_info(
|
||||||
|
binary,
|
||||||
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
||||||
) ::
|
) ::
|
||||||
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
|
list(
|
||||||
defp get_member_info(members) do
|
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|
||||||
Enum.map(members, fn %{
|
member_host :: binary}
|
||||||
client_host: member_host,
|
)
|
||||||
member_assignment: member_assignment,
|
defp get_member_info(consumer_group, members) do
|
||||||
member_id: consumer_id
|
members
|
||||||
} ->
|
|> Enum.map(fn %{
|
||||||
|
client_host: member_host,
|
||||||
|
member_assignment: member_assignment,
|
||||||
|
member_id: consumer_id
|
||||||
|
} ->
|
||||||
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||||
{topics, consumer_id, member_host}
|
{topics, consumer_id, member_host}
|
||||||
end)
|
end)
|
||||||
|
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
||||||
|
{consumer_group, topics, consumer_id, member_host}
|
||||||
|
end)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -40,9 +40,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "should return the calculated lags" do
|
test "should return the calculated lags" do
|
||||||
test_endpoint = {"test endpoint", 666}
|
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
|
||||||
|
|
||||||
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
|
|
||||||
|
|
||||||
assert sum == [
|
assert sum == [
|
||||||
%ConsumerOffset{
|
%ConsumerOffset{
|
||||||
|
|||||||
@@ -5,9 +5,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
|
|||||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||||
alias KafkaexLagExporter.KafkaUtils
|
alias KafkaexLagExporter.KafkaUtils
|
||||||
|
|
||||||
@test_host "test_host"
|
@test_endpoint {"test_host", "1234"}
|
||||||
@test_port "1234"
|
|
||||||
@test_endpoint {@test_host, @test_port}
|
|
||||||
@test_sock_opts [ssl: []]
|
@test_sock_opts [ssl: []]
|
||||||
@test_group_name1 "test-consumer_group1"
|
@test_group_name1 "test-consumer_group1"
|
||||||
@test_group_name2 "test-consumer_group"
|
@test_group_name2 "test-consumer_group"
|
||||||
@@ -28,18 +26,9 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
|
|||||||
consumer_info1 = {:a, @test_group_name1, "consumer"}
|
consumer_info1 = {:a, @test_group_name1, "consumer"}
|
||||||
consumer_info2 = {:b, @test_group_name2, "something-other"}
|
consumer_info2 = {:b, @test_group_name2, "something-other"}
|
||||||
|
|
||||||
patch(
|
patch(KafkaWrapper, :list_all_groups, fn _, _ ->
|
||||||
KafkaWrapper,
|
[{@test_endpoint, [consumer_info1, consumer_info2]}]
|
||||||
:list_all_groups,
|
end)
|
||||||
fn _, _ ->
|
|
||||||
[
|
|
||||||
{
|
|
||||||
@test_endpoint,
|
|
||||||
[consumer_info1, consumer_info2]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
end
|
|
||||||
)
|
|
||||||
|
|
||||||
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
|
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
|
||||||
|
|
||||||
@@ -63,27 +52,23 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
|
|||||||
consumer_id = "test_consumer_id"
|
consumer_id = "test_consumer_id"
|
||||||
member_host = "test_member_host"
|
member_host = "test_member_host"
|
||||||
|
|
||||||
patch(
|
patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
|
||||||
KafkaWrapper,
|
{
|
||||||
:describe_groups,
|
:ok,
|
||||||
fn _, _, _ ->
|
[
|
||||||
{
|
%{
|
||||||
:ok,
|
group_id: consumer_group_name,
|
||||||
[
|
members: [
|
||||||
%{
|
%{
|
||||||
group_id: consumer_group_name,
|
client_host: member_host,
|
||||||
members: [
|
member_assignment: member_assignment,
|
||||||
%{
|
member_id: consumer_id
|
||||||
client_host: member_host,
|
}
|
||||||
member_assignment: member_assignment,
|
]
|
||||||
member_id: consumer_id
|
}
|
||||||
}
|
]
|
||||||
]
|
}
|
||||||
}
|
end)
|
||||||
]
|
|
||||||
}
|
|
||||||
end
|
|
||||||
)
|
|
||||||
|
|
||||||
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
|
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
|
||||||
|
|
||||||
@@ -147,13 +132,9 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
|
|||||||
|
|
||||||
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
|
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
|
||||||
|
|
||||||
patch(
|
patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
|
||||||
KafkaWrapper,
|
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
|
||||||
:fetch_committed_offsets,
|
end)
|
||||||
fn _, _, _ ->
|
|
||||||
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
|
|
||||||
end
|
|
||||||
)
|
|
||||||
|
|
||||||
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)
|
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user