Compare commits
4 Commits
7104414bc1
...
98e79b927d
| Author | SHA1 | Date | |
|---|---|---|---|
| 98e79b927d | |||
| 9e9676d4e4 | |||
| bf8389b4e0 | |||
| ee5b69f646 |
@@ -1,4 +1,2 @@
|
||||
elixir 1.16.2-otp-26
|
||||
erlang 26.0.2
|
||||
tilt 0.33.11
|
||||
kind 0.22.0
|
||||
|
||||
17
README.md
17
README.md
@@ -24,12 +24,21 @@ partition for this group.
|
||||
## Start
|
||||
|
||||
```bash
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 -p 4000:4000 \
|
||||
lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
```
|
||||
|
||||
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
|
||||
|
||||
## Configuration
|
||||
|
||||
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
|
||||
value set `KAFKA_EX_INTERVAL_MS`, i.e.
|
||||
|
||||
```bash
|
||||
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
|
||||
lechindianer/kafkaex_lag_exporter:0.2.0
|
||||
```
|
||||
|
||||
## Developing
|
||||
|
||||
To start the project locally:
|
||||
@@ -38,11 +47,11 @@ To start the project locally:
|
||||
KAFKA_BROKERS="localhost:9092" iex -S mix
|
||||
```
|
||||
|
||||
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
|
||||
KafkaexLagExporter:
|
||||
|
||||
```bash
|
||||
docker-compose up --build
|
||||
docker compose up --build
|
||||
```
|
||||
|
||||
Kowl is served at [http://localhost:8080](http://localhost:8080).
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
kafka:
|
||||
brokers:
|
||||
- kafka1:9092
|
||||
- kafka2:9092
|
||||
- kafka3:9092
|
||||
|
||||
# server:
|
||||
# listenPort: 8080
|
||||
@@ -1,15 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
kind delete cluster
|
||||
|
||||
kind create cluster --config kind.yaml
|
||||
|
||||
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
|
||||
|
||||
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
|
||||
|
||||
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
|
||||
|
||||
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
|
||||
|
||||
echo "address=/kind.cluster/$LB_IP"
|
||||
24
kind.yaml
24
kind.yaml
@@ -1,24 +0,0 @@
|
||||
kind: Cluster
|
||||
apiVersion: kind.x-k8s.io/v1alpha4
|
||||
nodes:
|
||||
- role: control-plane
|
||||
kubeadmConfigPatches:
|
||||
- |
|
||||
kind: InitConfiguration
|
||||
nodeRegistration:
|
||||
kubeletExtraArgs:
|
||||
node-labels: "ingress-ready=true"
|
||||
extraPortMappings:
|
||||
- containerPort: 80
|
||||
hostPort: 80
|
||||
protocol: TCP
|
||||
- containerPort: 443
|
||||
hostPort: 443
|
||||
protocol: TCP
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
- role: worker
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
- role: worker
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
- role: worker
|
||||
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
|
||||
@@ -1,21 +0,0 @@
|
||||
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
|
||||
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
|
||||
k8s_yaml('./redpanda-deployment.yaml')
|
||||
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
|
||||
k8s_yaml('./redpanda-service.yaml')
|
||||
|
||||
k8s_yaml('./connect-service.yaml')
|
||||
k8s_yaml('./connect-deployment.yaml')
|
||||
|
||||
k8s_yaml('./owl-shop-deployment.yaml')
|
||||
|
||||
k8s_yaml('./console-deployment.yaml')
|
||||
k8s_yaml('./console-service.yaml')
|
||||
k8s_yaml('./console-ingress.yaml')
|
||||
|
||||
docker_build('kafka-lag-exporter', './..')
|
||||
|
||||
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
|
||||
k8s_yaml('./kafka-lag-exporter-service.yaml')
|
||||
|
||||
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)
|
||||
@@ -1,48 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.service: connect
|
||||
name: connect
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: connect
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: connect
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: CONNECT_BOOTSTRAP_SERVERS
|
||||
value: redpanda:29092
|
||||
- name: CONNECT_CONFIGURATION
|
||||
value: |
|
||||
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
||||
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
|
||||
group.id=connectors-cluster
|
||||
offset.storage.topic=_internal_connectors_offsets
|
||||
config.storage.topic=_internal_connectors_configs
|
||||
status.storage.topic=_internal_connectors_status
|
||||
config.storage.replication.factor=-1
|
||||
offset.storage.replication.factor=-1
|
||||
status.storage.replication.factor=-1
|
||||
offset.flush.interval.ms=1000
|
||||
producer.linger.ms=50
|
||||
producer.batch.size=131072
|
||||
- name: CONNECT_GC_LOG_ENABLED
|
||||
value: "false"
|
||||
- name: CONNECT_HEAP_OPTS
|
||||
value: -Xms512M -Xmx512M
|
||||
- name: CONNECT_LOG_LEVEL
|
||||
value: info
|
||||
image: docker.redpanda.com/redpandadata/connectors:latest
|
||||
name: connect
|
||||
ports:
|
||||
- containerPort: 8083
|
||||
hostname: connect
|
||||
restartPolicy: Always
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: connect
|
||||
name: connect
|
||||
spec:
|
||||
ports:
|
||||
- name: "8083"
|
||||
port: 8083
|
||||
targetPort: 8083
|
||||
selector:
|
||||
io.kompose.service: connect
|
||||
@@ -1,48 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.service: console
|
||||
name: console
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: console
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: console
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- -c
|
||||
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
|
||||
command:
|
||||
- /bin/sh
|
||||
env:
|
||||
- name: CONFIG_FILEPATH
|
||||
value: /tmp/config.yml
|
||||
- name: CONSOLE_CONFIG_FILE
|
||||
value: |
|
||||
kafka:
|
||||
brokers: ["redpanda:29092"]
|
||||
schemaRegistry:
|
||||
enabled: true
|
||||
urls: ["http://redpanda:8081"]
|
||||
redpanda:
|
||||
adminApi:
|
||||
enabled: true
|
||||
urls: ["http://redpanda:9644"]
|
||||
connect:
|
||||
enabled: true
|
||||
clusters:
|
||||
- name: local-connect-cluster
|
||||
url: http://connect:8083
|
||||
image: docker.redpanda.com/redpandadata/console:v2.2.4
|
||||
name: console
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
restartPolicy: Always
|
||||
@@ -1,17 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: ingress-myserviceb
|
||||
spec:
|
||||
rules:
|
||||
- host: console.lechindianer.hack
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: console
|
||||
port:
|
||||
number: 8080
|
||||
ingressClassName: nginx
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: console
|
||||
name: console
|
||||
spec:
|
||||
ports:
|
||||
- name: "8080"
|
||||
port: 8080
|
||||
targetPort: 8080
|
||||
selector:
|
||||
io.kompose.service: console
|
||||
@@ -1,26 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: KAFKA_BROKERS
|
||||
value: redpanda:29092
|
||||
image: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
ports:
|
||||
- containerPort: 4000
|
||||
restartPolicy: Always
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
name: kafka-lag-exporter
|
||||
spec:
|
||||
ports:
|
||||
- name: "4000"
|
||||
port: 4000
|
||||
targetPort: 4000
|
||||
selector:
|
||||
io.kompose.service: kafka-lag-exporter
|
||||
@@ -1,31 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: owl-shop
|
||||
name: owl-shop
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: owl-shop
|
||||
template:
|
||||
metadata:
|
||||
creationTimestamp: null
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: owl-shop
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: SHOP_KAFKA_BROKERS
|
||||
value: redpanda:29092
|
||||
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
|
||||
value: "1"
|
||||
- name: SHOP_TRAFFIC_INTERVAL_DURATION
|
||||
value: 0.1s
|
||||
- name: SHOP_TRAFFIC_INTERVAL_RATE
|
||||
value: "6"
|
||||
image: quay.io/cloudhut/owl-shop:latest
|
||||
name: owl-shop
|
||||
restartPolicy: Always
|
||||
@@ -1,44 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
io.kompose.service: redpanda
|
||||
strategy:
|
||||
type: Recreate
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
io.kompose.service: redpanda
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- redpanda start
|
||||
- --smp 1
|
||||
- --overprovisioned
|
||||
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
|
||||
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
|
||||
- --pandaproxy-addr 0.0.0.0:8082
|
||||
- --advertise-pandaproxy-addr localhost:8082
|
||||
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
|
||||
name: redpanda
|
||||
ports:
|
||||
- containerPort: 8081
|
||||
- containerPort: 8082
|
||||
- containerPort: 9092
|
||||
- containerPort: 9644
|
||||
- containerPort: 29092
|
||||
volumeMounts:
|
||||
- mountPath: /var/lib/redpanda/data
|
||||
name: redpanda
|
||||
restartPolicy: Always
|
||||
volumes:
|
||||
- name: redpanda
|
||||
persistentVolumeClaim:
|
||||
claimName: redpanda
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: NetworkPolicy
|
||||
metadata:
|
||||
name: redpanda-owl-shop-default
|
||||
spec:
|
||||
ingress:
|
||||
- from:
|
||||
- podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-default: "true"
|
||||
@@ -1,13 +0,0 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: NetworkPolicy
|
||||
metadata:
|
||||
name: redpanda-owl-shop-redpanda-network
|
||||
spec:
|
||||
ingress:
|
||||
- from:
|
||||
- podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
podSelector:
|
||||
matchLabels:
|
||||
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
|
||||
@@ -1,12 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: 100Mi
|
||||
@@ -1,25 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
io.kompose.service: redpanda
|
||||
name: redpanda
|
||||
spec:
|
||||
ports:
|
||||
- name: "8081"
|
||||
port: 8081
|
||||
targetPort: 8081
|
||||
- name: "8082"
|
||||
port: 8082
|
||||
targetPort: 8082
|
||||
- name: "9092"
|
||||
port: 9092
|
||||
targetPort: 9092
|
||||
- name: "9644"
|
||||
port: 9644
|
||||
targetPort: 9644
|
||||
- name: "29092"
|
||||
port: 29092
|
||||
targetPort: 29092
|
||||
selector:
|
||||
io.kompose.service: redpanda
|
||||
@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
||||
|
||||
require Logger
|
||||
|
||||
@interval 5_000
|
||||
|
||||
def start_link(default) when is_list(default) do
|
||||
GenServer.start_link(__MODULE__, default, name: __MODULE__)
|
||||
end
|
||||
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
|
||||
|
||||
interval =
|
||||
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|
||||
|> String.to_integer()
|
||||
|
||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||
Logger.info("Updating lag information every #{interval} milliseconds")
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
Process.send_after(self(), :tick, interval)
|
||||
|
||||
{:ok, %{endpoints: endpoints}}
|
||||
{:ok, %{endpoints: endpoints, interval: interval}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
[endpoint | _] = state.endpoints
|
||||
interval = state.interval
|
||||
|
||||
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
|
||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
Process.send_after(self(), :tick, interval)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
|
||||
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
|
||||
|
||||
require Logger
|
||||
|
||||
@default_client :client1
|
||||
|
||||
def connection, do: connection(@default_client)
|
||||
@@ -46,10 +44,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
|
||||
group_descriptions
|
||||
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
|
||||
get_member_info(members)
|
||||
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
||||
{consumer_group, topics, consumer_id, member_host}
|
||||
end)
|
||||
get_member_info(consumer_group, members)
|
||||
end)
|
||||
end
|
||||
|
||||
@@ -63,7 +58,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
fetch_committed_offsets(topic, consumer_group, client)
|
||||
|> Enum.sort_by(fn {key, _value} -> key end)
|
||||
|
||||
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
|
||||
{part, current - committed}
|
||||
end
|
||||
end
|
||||
@@ -75,8 +70,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
|
||||
|
||||
for i <- Range.new(0, partitions_count - 1),
|
||||
{:ok, offset} =
|
||||
KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
||||
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
|
||||
{i, offset}
|
||||
end
|
||||
end
|
||||
@@ -88,17 +82,22 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
|
||||
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
|
||||
|
||||
for r <- response,
|
||||
pr <- r[:partitions],
|
||||
do: {pr[:partition_index], pr[:committed_offset]}
|
||||
for r <- response, pr <- r[:partitions] do
|
||||
{pr[:partition_index], pr[:committed_offset]}
|
||||
end
|
||||
end
|
||||
|
||||
@spec get_member_info(
|
||||
binary,
|
||||
list(%{client_host: binary, member_assignment: binary, member_id: binary})
|
||||
) ::
|
||||
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
|
||||
defp get_member_info(members) do
|
||||
Enum.map(members, fn %{
|
||||
list(
|
||||
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
|
||||
member_host :: binary}
|
||||
)
|
||||
defp get_member_info(consumer_group, members) do
|
||||
members
|
||||
|> Enum.map(fn %{
|
||||
client_host: member_host,
|
||||
member_assignment: member_assignment,
|
||||
member_id: consumer_id
|
||||
@@ -106,5 +105,8 @@ defmodule KafkaexLagExporter.KafkaUtils do
|
||||
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
|
||||
{topics, consumer_id, member_host}
|
||||
end)
|
||||
|> Enum.map(fn {topics, consumer_id, member_host} ->
|
||||
{consumer_group, topics, consumer_id, member_host}
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user