Compare commits
4 Commits
7104414bc1
...
98e79b927d
| Author | SHA1 | Date | |
|---|---|---|---|
| 98e79b927d | |||
| 9e9676d4e4 | |||
| bf8389b4e0 | |||
| ee5b69f646 |
@@ -1,4 +1,2 @@
|
|||||||
elixir 1.16.2-otp-26
|
elixir 1.16.2-otp-26
|
||||||
erlang 26.0.2
|
erlang 26.0.2
|
||||||
tilt 0.33.11
|
|
||||||
kind 0.22.0
|
|
||||||
|
|||||||
17
README.md
17
README.md
@@ -24,12 +24,21 @@ partition for this group.
|
|||||||
## Start
|
## Start
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 -p 4000:4000 \
|
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
|
||||||
lechindianer/kafkaex_lag_exporter:0.2.0
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
|
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
|
||||||
|
value set `KAFKA_EX_INTERVAL_MS`, i.e.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
|
||||||
|
lechindianer/kafkaex_lag_exporter:0.2.0
|
||||||
|
```
|
||||||
|
|
||||||
## Developing
|
## Developing
|
||||||
|
|
||||||
To start the project locally:
|
To start the project locally:
|
||||||
@@ -38,11 +47,11 @@ To start the project locally:
|
|||||||
KAFKA_BROKERS="localhost:9092" iex -S mix
|
KAFKA_BROKERS="localhost:9092" iex -S mix
|
||||||
```
|
```
|
||||||
|
|
||||||
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||||
KafkaexLagExporter:
|
KafkaexLagExporter:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker-compose up --build
|
docker compose up --build
|
||||||
```
|
```
|
||||||
|
|
||||||
Kowl is served at [http://localhost:8080](http://localhost:8080).
|
Kowl is served at [http://localhost:8080](http://localhost:8080).
|
||||||
|
|||||||
@@ -1,8 +0,0 @@
|
|||||||
kafka:
|
|
||||||
brokers:
|
|
||||||
- kafka1:9092
|
|
||||||
- kafka2:9092
|
|
||||||
- kafka3:9092
|
|
||||||
|
|
||||||
# server:
|
|
||||||
# listenPort: 8080
|
|
||||||
@@ -1,15 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
kind delete cluster
|
|
||||||
|
|
||||||
kind create cluster --config kind.yaml
|
|
||||||
|
|
||||||
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
|
|
||||||
|
|
||||||
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
|
|
||||||
|
|
||||||
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
|
|
||||||
|
|
||||||
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
|
|
||||||
|
|
||||||
echo "address=/kind.cluster/$LB_IP"
|
|
||||||
24
kind.yaml
24
kind.yaml
@@ -1,24 +0,0 @@
|
|||||||
kind: Cluster
|
|
||||||
apiVersion: kind.x-k8s.io/v1alpha4
|
|
||||||
nodes:
|
|
||||||
- role: control-plane
|
|
||||||
kubeadmConfigPatches:
|
|
||||||
- |
|
|
||||||
kind: InitConfiguration
|
|
||||||
nodeRegistration:
|
|
||||||
kubeletExtraArgs:
|
|
||||||
node-labels: "ingress-ready=true"
|
|
||||||
extraPortMappings:
|
|
||||||
- containerPort: 80
|
|
||||||
hostPort: 80
|
|
||||||
protocol: TCP
|
|
||||||
- containerPort: 443
|
|
||||||
hostPort: 443
|
|
||||||
protocol: TCP
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
- role: worker
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
- role: worker
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
- role: worker
|
|
||||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
|
|
||||||
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
|
|
||||||
k8s_yaml('./redpanda-deployment.yaml')
|
|
||||||
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
|
|
||||||
k8s_yaml('./redpanda-service.yaml')
|
|
||||||
|
|
||||||
k8s_yaml('./connect-service.yaml')
|
|
||||||
k8s_yaml('./connect-deployment.yaml')
|
|
||||||
|
|
||||||
k8s_yaml('./owl-shop-deployment.yaml')
|
|
||||||
|
|
||||||
k8s_yaml('./console-deployment.yaml')
|
|
||||||
k8s_yaml('./console-service.yaml')
|
|
||||||
k8s_yaml('./console-ingress.yaml')
|
|
||||||
|
|
||||||
docker_build('kafka-lag-exporter', './..')
|
|
||||||
|
|
||||||
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
|
|
||||||
k8s_yaml('./kafka-lag-exporter-service.yaml')
|
|
||||||
|
|
||||||
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)
|
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
creationTimestamp: null
|
|
||||||
labels:
|
|
||||||
io.kompose.service: connect
|
|
||||||
name: connect
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: connect
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: connect
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- env:
|
|
||||||
- name: CONNECT_BOOTSTRAP_SERVERS
|
|
||||||
value: redpanda:29092
|
|
||||||
- name: CONNECT_CONFIGURATION
|
|
||||||
value: |
|
|
||||||
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
|
||||||
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
|
||||||
group.id=connectors-cluster
|
|
||||||
offset.storage.topic=_internal_connectors_offsets
|
|
||||||
config.storage.topic=_internal_connectors_configs
|
|
||||||
status.storage.topic=_internal_connectors_status
|
|
||||||
config.storage.replication.factor=-1
|
|
||||||
offset.storage.replication.factor=-1
|
|
||||||
status.storage.replication.factor=-1
|
|
||||||
offset.flush.interval.ms=1000
|
|
||||||
producer.linger.ms=50
|
|
||||||
producer.batch.size=131072
|
|
||||||
- name: CONNECT_GC_LOG_ENABLED
|
|
||||||
value: "false"
|
|
||||||
- name: CONNECT_HEAP_OPTS
|
|
||||||
value: -Xms512M -Xmx512M
|
|
||||||
- name: CONNECT_LOG_LEVEL
|
|
||||||
value: info
|
|
||||||
image: docker.redpanda.com/redpandadata/connectors:latest
|
|
||||||
name: connect
|
|
||||||
ports:
|
|
||||||
- containerPort: 8083
|
|
||||||
hostname: connect
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: connect
|
|
||||||
name: connect
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "8083"
|
|
||||||
port: 8083
|
|
||||||
targetPort: 8083
|
|
||||||
selector:
|
|
||||||
io.kompose.service: connect
|
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
creationTimestamp: null
|
|
||||||
labels:
|
|
||||||
io.kompose.service: console
|
|
||||||
name: console
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: console
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: console
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- args:
|
|
||||||
- -c
|
|
||||||
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
|
|
||||||
command:
|
|
||||||
- /bin/sh
|
|
||||||
env:
|
|
||||||
- name: CONFIG_FILEPATH
|
|
||||||
value: /tmp/config.yml
|
|
||||||
- name: CONSOLE_CONFIG_FILE
|
|
||||||
value: |
|
|
||||||
kafka:
|
|
||||||
brokers: ["redpanda:29092"]
|
|
||||||
schemaRegistry:
|
|
||||||
enabled: true
|
|
||||||
urls: ["http://redpanda:8081"]
|
|
||||||
redpanda:
|
|
||||||
adminApi:
|
|
||||||
enabled: true
|
|
||||||
urls: ["http://redpanda:9644"]
|
|
||||||
connect:
|
|
||||||
enabled: true
|
|
||||||
clusters:
|
|
||||||
- name: local-connect-cluster
|
|
||||||
url: http://connect:8083
|
|
||||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
|
||||||
name: console
|
|
||||||
ports:
|
|
||||||
- containerPort: 8080
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,17 +0,0 @@
|
|||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: Ingress
|
|
||||||
metadata:
|
|
||||||
name: ingress-myserviceb
|
|
||||||
spec:
|
|
||||||
rules:
|
|
||||||
- host: console.lechindianer.hack
|
|
||||||
http:
|
|
||||||
paths:
|
|
||||||
- path: /
|
|
||||||
pathType: Prefix
|
|
||||||
backend:
|
|
||||||
service:
|
|
||||||
name: console
|
|
||||||
port:
|
|
||||||
number: 8080
|
|
||||||
ingressClassName: nginx
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: console
|
|
||||||
name: console
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "8080"
|
|
||||||
port: 8080
|
|
||||||
targetPort: 8080
|
|
||||||
selector:
|
|
||||||
io.kompose.service: console
|
|
||||||
@@ -1,26 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
name: kafka-lag-exporter
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- env:
|
|
||||||
- name: KAFKA_BROKERS
|
|
||||||
value: redpanda:29092
|
|
||||||
image: kafka-lag-exporter
|
|
||||||
name: kafka-lag-exporter
|
|
||||||
ports:
|
|
||||||
- containerPort: 4000
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
name: kafka-lag-exporter
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "4000"
|
|
||||||
port: 4000
|
|
||||||
targetPort: 4000
|
|
||||||
selector:
|
|
||||||
io.kompose.service: kafka-lag-exporter
|
|
||||||
@@ -1,31 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: owl-shop
|
|
||||||
name: owl-shop
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: owl-shop
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
creationTimestamp: null
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: owl-shop
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- env:
|
|
||||||
- name: SHOP_KAFKA_BROKERS
|
|
||||||
value: redpanda:29092
|
|
||||||
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
|
|
||||||
value: "1"
|
|
||||||
- name: SHOP_TRAFFIC_INTERVAL_DURATION
|
|
||||||
value: 0.1s
|
|
||||||
- name: SHOP_TRAFFIC_INTERVAL_RATE
|
|
||||||
value: "6"
|
|
||||||
image: quay.io/cloudhut/owl-shop:latest
|
|
||||||
name: owl-shop
|
|
||||||
restartPolicy: Always
|
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
name: redpanda
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
strategy:
|
|
||||||
type: Recreate
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
spec:
|
|
||||||
containers:
|
|
||||||
- args:
|
|
||||||
- redpanda start
|
|
||||||
- --smp 1
|
|
||||||
- --overprovisioned
|
|
||||||
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
|
|
||||||
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
|
|
||||||
- --pandaproxy-addr 0.0.0.0:8082
|
|
||||||
- --advertise-pandaproxy-addr localhost:8082
|
|
||||||
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
|
|
||||||
name: redpanda
|
|
||||||
ports:
|
|
||||||
- containerPort: 8081
|
|
||||||
- containerPort: 8082
|
|
||||||
- containerPort: 9092
|
|
||||||
- containerPort: 9644
|
|
||||||
- containerPort: 29092
|
|
||||||
volumeMounts:
|
|
||||||
- mountPath: /var/lib/redpanda/data
|
|
||||||
name: redpanda
|
|
||||||
restartPolicy: Always
|
|
||||||
volumes:
|
|
||||||
- name: redpanda
|
|
||||||
persistentVolumeClaim:
|
|
||||||
claimName: redpanda
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: NetworkPolicy
|
|
||||||
metadata:
|
|
||||||
name: redpanda-owl-shop-default
|
|
||||||
spec:
|
|
||||||
ingress:
|
|
||||||
- from:
|
|
||||||
- podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
|
||||||
podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: NetworkPolicy
|
|
||||||
metadata:
|
|
||||||
name: redpanda-owl-shop-redpanda-network
|
|
||||||
spec:
|
|
||||||
ingress:
|
|
||||||
- from:
|
|
||||||
- podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
podSelector:
|
|
||||||
matchLabels:
|
|
||||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: PersistentVolumeClaim
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
name: redpanda
|
|
||||||
spec:
|
|
||||||
accessModes:
|
|
||||||
- ReadWriteOnce
|
|
||||||
resources:
|
|
||||||
requests:
|
|
||||||
storage: 100Mi
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
name: redpanda
|
|
||||||
spec:
|
|
||||||
ports:
|
|
||||||
- name: "8081"
|
|
||||||
port: 8081
|
|
||||||
targetPort: 8081
|
|
||||||
- name: "8082"
|
|
||||||
port: 8082
|
|
||||||
targetPort: 8082
|
|
||||||
- name: "9092"
|
|
||||||
port: 9092
|
|
||||||
targetPort: 9092
|
|
||||||
- name: "9644"
|
|
||||||
port: 9644
|
|
||||||
targetPort: 9644
|
|
||||||
- name: "29092"
|
|
||||||
port: 29092
|
|
||||||
targetPort: 29092
|
|
||||||
selector:
|
|
||||||
io.kompose.service: redpanda
|
|
||||||
@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
@interval 5_000
|
|
||||||
|
|
||||||
def start_link(default) when is_list(default) do
|
def start_link(default) when is_list(default) do
|
||||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||||
end
|
end
|
||||||
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
|||||||
clients = Application.get_env(:brod, :clients)
|
clients = Application.get_env(:brod, :clients)
|
||||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||||
|
|
||||||
|
interval =
|
||||||
|
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|
||||||
|
|> String.to_integer()
|
||||||
|
|
||||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||||
|
Logger.info("Updating lag information every #{interval} milliseconds")
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, interval)
|
||||||
|
|
||||||
{:ok, %{endpoints: endpoints}}
|
{:ok, %{endpoints: endpoints, interval: interval}}
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
[endpoint | _] = state.endpoints
|
[endpoint | _] = state.endpoints
|
||||||
|
interval = state.interval
|
||||||
|
|
||||||
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||||
|
|
||||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||||
|
|
||||||
Process.send_after(self(), :tick, @interval)
|
Process.send_after(self(), :tick, interval)
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
|
|
||||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||||
|
|
||||||
require Logger
|
|
||||||
|
|
||||||
@default_client :client1
|
@default_client :client1
|
||||||
|
|
||||||
def connection, do: connection(@default_client)
|
def connection, do: connection(@default_client)
|
||||||
@@ -46,10 +44,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
|
|
||||||
group_descriptions
|
group_descriptions
|
||||||
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
||||||
get_member_info(members)
|
get_member_info(consumer_group, members)
|
||||||
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
|
||||||
{consumer_group, topics, consumer_id, member_host}
|
|
||||||
end)
|
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -63,7 +58,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
fetch_committed_offsets(topic, consumer_group, client)
|
fetch_committed_offsets(topic, consumer_group, client)
|
||||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||||
|
|
||||||
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
|
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||||
{part, current - committed}
|
{part, current - committed}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -75,8 +70,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||||
|
|
||||||
for i <- Range.new(0, partitions_count - 1),
|
for i <- Range.new(0, partitions_count - 1),
|
||||||
{:ok, offset} =
|
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
||||||
KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
|
||||||
{i, offset}
|
{i, offset}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -88,17 +82,22 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
|
|
||||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||||
|
|
||||||
for r <- response,
|
for r <- response, pr <- r[:partitions] do
|
||||||
pr <- r[:partitions],
|
{pr[:partition_index], pr[:committed_offset]}
|
||||||
do: {pr[:partition_index], pr[:committed_offset]}
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec get_member_info(
|
@spec get_member_info(
|
||||||
|
binary,
|
||||||
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
||||||
) ::
|
) ::
|
||||||
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
|
list(
|
||||||
defp get_member_info(members) do
|
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|
||||||
Enum.map(members, fn %{
|
member_host :: binary}
|
||||||
|
)
|
||||||
|
defp get_member_info(consumer_group, members) do
|
||||||
|
members
|
||||||
|
|> Enum.map(fn %{
|
||||||
client_host: member_host,
|
client_host: member_host,
|
||||||
member_assignment: member_assignment,
|
member_assignment: member_assignment,
|
||||||
member_id: consumer_id
|
member_id: consumer_id
|
||||||
@@ -106,5 +105,8 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
|||||||
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||||
{topics, consumer_id, member_host}
|
{topics, consumer_id, member_host}
|
||||||
end)
|
end)
|
||||||
|
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
||||||
|
{consumer_group, topics, consumer_id, member_host}
|
||||||
|
end)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user