Compare commits

...

4 Commits

Author SHA1 Message Date
98e79b927d Update docs to include configurable interval 2024-04-16 22:21:46 +02:00
9e9676d4e4 Remove code for Tilt cluster 2024-04-16 22:16:44 +02:00
bf8389b4e0 Simplify code 2024-04-16 22:11:50 +02:00
ee5b69f646 Make update interval configurable 2024-04-16 22:08:52 +02:00
22 changed files with 43 additions and 417 deletions

View File

@@ -1,4 +1,2 @@
elixir 1.16.2-otp-26
erlang 26.0.2
tilt 0.33.11
kind 0.22.0

View File

@@ -24,12 +24,21 @@ partition for this group.
## Start
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
```
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
## Configuration
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
value set `KAFKA_EX_INTERVAL_MS`, i.e.
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
```
## Developing
To start the project locally:
@@ -38,11 +47,11 @@ To start the project locally:
KAFKA_BROKERS="localhost:9092" iex -S mix
```
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
KafkaexLagExporter:
```bash
docker-compose up --build
docker compose up --build
```
Kowl is served at [http://localhost:8080](http://localhost:8080).

View File

@@ -1,3 +0,0 @@
allow_k8s_contexts('default')
include('konvert/Tiltfile')

View File

@@ -1,8 +0,0 @@
kafka:
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# server:
# listenPort: 8080

View File

@@ -1,15 +0,0 @@
#!/usr/bin/env bash
kind delete cluster
kind create cluster --config kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "address=/kind.cluster/$LB_IP"

View File

@@ -1,24 +0,0 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f

View File

@@ -1,21 +0,0 @@
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
k8s_yaml('./redpanda-deployment.yaml')
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
k8s_yaml('./redpanda-service.yaml')
k8s_yaml('./connect-service.yaml')
k8s_yaml('./connect-deployment.yaml')
k8s_yaml('./owl-shop-deployment.yaml')
k8s_yaml('./console-deployment.yaml')
k8s_yaml('./console-service.yaml')
k8s_yaml('./console-ingress.yaml')
docker_build('kafka-lag-exporter', './..')
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
k8s_yaml('./kafka-lag-exporter-service.yaml')
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: connect
name: connect
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: connect
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: connect
spec:
containers:
- env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: redpanda:29092
- name: CONNECT_CONFIGURATION
value: |
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
group.id=connectors-cluster
offset.storage.topic=_internal_connectors_offsets
config.storage.topic=_internal_connectors_configs
status.storage.topic=_internal_connectors_status
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.flush.interval.ms=1000
producer.linger.ms=50
producer.batch.size=131072
- name: CONNECT_GC_LOG_ENABLED
value: "false"
- name: CONNECT_HEAP_OPTS
value: -Xms512M -Xmx512M
- name: CONNECT_LOG_LEVEL
value: info
image: docker.redpanda.com/redpandadata/connectors:latest
name: connect
ports:
- containerPort: 8083
hostname: connect
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: connect
name: connect
spec:
ports:
- name: "8083"
port: 8083
targetPort: 8083
selector:
io.kompose.service: connect

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: console
name: console
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: console
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: console
spec:
containers:
- args:
- -c
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
command:
- /bin/sh
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
image: docker.redpanda.com/redpandadata/console:v2.2.4
name: console
ports:
- containerPort: 8080
restartPolicy: Always

View File

@@ -1,17 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-myserviceb
spec:
rules:
- host: console.lechindianer.hack
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: console
port:
number: 8080
ingressClassName: nginx

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: console
name: console
spec:
ports:
- name: "8080"
port: 8080
targetPort: 8080
selector:
io.kompose.service: console

View File

@@ -1,26 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka-lag-exporter
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-default: "true"
io.kompose.service: kafka-lag-exporter
spec:
containers:
- env:
- name: KAFKA_BROKERS
value: redpanda:29092
image: kafka-lag-exporter
name: kafka-lag-exporter
ports:
- containerPort: 4000
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
ports:
- name: "4000"
port: 4000
targetPort: 4000
selector:
io.kompose.service: kafka-lag-exporter

View File

@@ -1,31 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: owl-shop
name: owl-shop
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: owl-shop
template:
metadata:
creationTimestamp: null
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: owl-shop
spec:
containers:
- env:
- name: SHOP_KAFKA_BROKERS
value: redpanda:29092
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
value: "1"
- name: SHOP_TRAFFIC_INTERVAL_DURATION
value: 0.1s
- name: SHOP_TRAFFIC_INTERVAL_RATE
value: "6"
image: quay.io/cloudhut/owl-shop:latest
name: owl-shop
restartPolicy: Always

View File

@@ -1,44 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: redpanda
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: redpanda
spec:
containers:
- args:
- redpanda start
- --smp 1
- --overprovisioned
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
name: redpanda
ports:
- containerPort: 8081
- containerPort: 8082
- containerPort: 9092
- containerPort: 9644
- containerPort: 29092
volumeMounts:
- mountPath: /var/lib/redpanda/data
name: redpanda
restartPolicy: Always
volumes:
- name: redpanda
persistentVolumeClaim:
claimName: redpanda

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-default
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-redpanda-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"

View File

@@ -1,12 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi

View File

@@ -1,25 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
ports:
- name: "8081"
port: 8081
targetPort: 8081
- name: "8082"
port: 8082
targetPort: 8082
- name: "9092"
port: 9092
targetPort: 9092
- name: "9644"
port: 9644
targetPort: 9644
- name: "29092"
port: 29092
targetPort: 29092
selector:
io.kompose.service: redpanda

View File

@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
interval =
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|> String.to_integer()
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Logger.info("Updating lag information every #{interval} milliseconds")
Process.send_after(self(), :tick, @interval)
Process.send_after(self(), :tick, interval)
{:ok, %{endpoints: endpoints}}
{:ok, %{endpoints: endpoints, interval: interval}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
interval = state.interval
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
Process.send_after(self(), :tick, interval)
{:noreply, state}
end

View File

@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
require Logger
@default_client :client1
def connection, do: connection(@default_client)
@@ -46,10 +44,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
group_descriptions
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
get_member_info(members)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
get_member_info(consumer_group, members)
end)
end
@@ -63,7 +58,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}
end
end
@@ -75,8 +70,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} =
KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{i, offset}
end
end
@@ -88,23 +82,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response,
pr <- r[:partitions],
do: {pr[:partition_index], pr[:committed_offset]}
for r <- response, pr <- r[:partitions] do
{pr[:partition_index], pr[:committed_offset]}
end
end
@spec get_member_info(
binary,
list(%{client_host: binary, member_assignment: binary, member_id: binary})
) ::
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
defp get_member_info(members) do
Enum.map(members, fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
list(
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
member_host :: binary}
)
defp get_member_info(consumer_group, members) do
members
|> Enum.map(fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topics, consumer_id, member_host}
end)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
end
end