Compare commits

...

6 Commits

Author SHA1 Message Date
29c032cb16 Add license 2024-06-01 12:02:18 +02:00
a34e91770f Simplify test code 2024-04-16 22:33:28 +02:00
98e79b927d Update docs to include configurable interval 2024-04-16 22:21:46 +02:00
9e9676d4e4 Remove code for Tilt cluster 2024-04-16 22:16:44 +02:00
bf8389b4e0 Simplify code 2024-04-16 22:11:50 +02:00
ee5b69f646 Make update interval configurable 2024-04-16 22:08:52 +02:00
25 changed files with 80 additions and 463 deletions

View File

@@ -1,4 +1,2 @@
elixir 1.16.2-otp-26
erlang 26.0.2
tilt 0.33.11
kind 0.22.0

12
LICENSE Normal file
View File

@@ -0,0 +1,12 @@
ISC License
Copyright 2024 Pascal Schmid
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
OF THIS SOFTWARE.

View File

@@ -24,12 +24,21 @@ partition for this group.
## Start
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
```
Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
## Configuration
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
value set `KAFKA_EX_INTERVAL_MS`, i.e.
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
```
## Developing
To start the project locally:
@@ -38,11 +47,11 @@ To start the project locally:
KAFKA_BROKERS="localhost:9092" iex -S mix
```
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
KafkaexLagExporter:
```bash
docker-compose up --build
docker compose up --build
```
Kowl is served at [http://localhost:8080](http://localhost:8080).

View File

@@ -1,3 +0,0 @@
allow_k8s_contexts('default')
include('konvert/Tiltfile')

View File

@@ -1,8 +0,0 @@
kafka:
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# server:
# listenPort: 8080

View File

@@ -1,15 +0,0 @@
#!/usr/bin/env bash
kind delete cluster
kind create cluster --config kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "address=/kind.cluster/$LB_IP"

View File

@@ -1,24 +0,0 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f

View File

@@ -1,21 +0,0 @@
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
k8s_yaml('./redpanda-deployment.yaml')
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
k8s_yaml('./redpanda-service.yaml')
k8s_yaml('./connect-service.yaml')
k8s_yaml('./connect-deployment.yaml')
k8s_yaml('./owl-shop-deployment.yaml')
k8s_yaml('./console-deployment.yaml')
k8s_yaml('./console-service.yaml')
k8s_yaml('./console-ingress.yaml')
docker_build('kafka-lag-exporter', './..')
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
k8s_yaml('./kafka-lag-exporter-service.yaml')
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: connect
name: connect
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: connect
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: connect
spec:
containers:
- env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: redpanda:29092
- name: CONNECT_CONFIGURATION
value: |
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
group.id=connectors-cluster
offset.storage.topic=_internal_connectors_offsets
config.storage.topic=_internal_connectors_configs
status.storage.topic=_internal_connectors_status
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.flush.interval.ms=1000
producer.linger.ms=50
producer.batch.size=131072
- name: CONNECT_GC_LOG_ENABLED
value: "false"
- name: CONNECT_HEAP_OPTS
value: -Xms512M -Xmx512M
- name: CONNECT_LOG_LEVEL
value: info
image: docker.redpanda.com/redpandadata/connectors:latest
name: connect
ports:
- containerPort: 8083
hostname: connect
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: connect
name: connect
spec:
ports:
- name: "8083"
port: 8083
targetPort: 8083
selector:
io.kompose.service: connect

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: console
name: console
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: console
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: console
spec:
containers:
- args:
- -c
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
command:
- /bin/sh
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
image: docker.redpanda.com/redpandadata/console:v2.2.4
name: console
ports:
- containerPort: 8080
restartPolicy: Always

View File

@@ -1,17 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-myserviceb
spec:
rules:
- host: console.lechindianer.hack
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: console
port:
number: 8080
ingressClassName: nginx

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: console
name: console
spec:
ports:
- name: "8080"
port: 8080
targetPort: 8080
selector:
io.kompose.service: console

View File

@@ -1,26 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka-lag-exporter
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-default: "true"
io.kompose.service: kafka-lag-exporter
spec:
containers:
- env:
- name: KAFKA_BROKERS
value: redpanda:29092
image: kafka-lag-exporter
name: kafka-lag-exporter
ports:
- containerPort: 4000
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
ports:
- name: "4000"
port: 4000
targetPort: 4000
selector:
io.kompose.service: kafka-lag-exporter

View File

@@ -1,31 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: owl-shop
name: owl-shop
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: owl-shop
template:
metadata:
creationTimestamp: null
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: owl-shop
spec:
containers:
- env:
- name: SHOP_KAFKA_BROKERS
value: redpanda:29092
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
value: "1"
- name: SHOP_TRAFFIC_INTERVAL_DURATION
value: 0.1s
- name: SHOP_TRAFFIC_INTERVAL_RATE
value: "6"
image: quay.io/cloudhut/owl-shop:latest
name: owl-shop
restartPolicy: Always

View File

@@ -1,44 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: redpanda
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: redpanda
spec:
containers:
- args:
- redpanda start
- --smp 1
- --overprovisioned
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
name: redpanda
ports:
- containerPort: 8081
- containerPort: 8082
- containerPort: 9092
- containerPort: 9644
- containerPort: 29092
volumeMounts:
- mountPath: /var/lib/redpanda/data
name: redpanda
restartPolicy: Always
volumes:
- name: redpanda
persistentVolumeClaim:
claimName: redpanda

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-default
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-redpanda-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"

View File

@@ -1,12 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi

View File

@@ -1,25 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
ports:
- name: "8081"
port: 8081
targetPort: 8081
- name: "8082"
port: 8082
targetPort: 8082
- name: "9092"
port: 9092
targetPort: 9092
- name: "9644"
port: 9644
targetPort: 9644
- name: "29092"
port: 29092
targetPort: 29092
selector:
io.kompose.service: redpanda

View File

@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
require Logger
@interval 5_000
def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__)
end
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
interval =
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|> String.to_integer()
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Logger.info("Updating lag information every #{interval} milliseconds")
Process.send_after(self(), :tick, @interval)
Process.send_after(self(), :tick, interval)
{:ok, %{endpoints: endpoints}}
{:ok, %{endpoints: endpoints, interval: interval}}
end
@impl true
def handle_info(:tick, state) do
[endpoint | _] = state.endpoints
interval = state.interval
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval)
Process.send_after(self(), :tick, interval)
{:noreply, state}
end

View File

@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
require Logger
@default_client :client1
def connection, do: connection(@default_client)
@@ -46,10 +44,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
group_descriptions
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
get_member_info(members)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
get_member_info(consumer_group, members)
end)
end
@@ -63,7 +58,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do
for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed}
end
end
@@ -75,8 +70,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1),
{:ok, offset} =
KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{i, offset}
end
end
@@ -88,23 +82,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response,
pr <- r[:partitions],
do: {pr[:partition_index], pr[:committed_offset]}
for r <- response, pr <- r[:partitions] do
{pr[:partition_index], pr[:committed_offset]}
end
end
@spec get_member_info(
binary,
list(%{client_host: binary, member_assignment: binary, member_id: binary})
) ::
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary})
defp get_member_info(members) do
Enum.map(members, fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
list(
{consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
member_host :: binary}
)
defp get_member_info(consumer_group, members) do
members
|> Enum.map(fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topics, consumer_id, member_host}
end)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
end
end

View File

@@ -40,9 +40,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
end
test "should return the calculated lags" do
test_endpoint = {"test endpoint", 666}
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
assert sum == [
%ConsumerOffset{

View File

@@ -5,9 +5,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
alias KafkaexLagExporter.KafkaUtils
@test_host "test_host"
@test_port "1234"
@test_endpoint {@test_host, @test_port}
@test_endpoint {"test_host", "1234"}
@test_sock_opts [ssl: []]
@test_group_name1 "test-consumer_group1"
@test_group_name2 "test-consumer_group"
@@ -28,18 +26,9 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
consumer_info1 = {:a, @test_group_name1, "consumer"}
consumer_info2 = {:b, @test_group_name2, "something-other"}
patch(
KafkaWrapper,
:list_all_groups,
fn _, _ ->
[
{
@test_endpoint,
[consumer_info1, consumer_info2]
}
]
end
)
patch(KafkaWrapper, :list_all_groups, fn _, _ ->
[{@test_endpoint, [consumer_info1, consumer_info2]}]
end)
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
@@ -63,27 +52,23 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
consumer_id = "test_consumer_id"
member_host = "test_member_host"
patch(
KafkaWrapper,
:describe_groups,
fn _, _, _ ->
{
:ok,
[
%{
group_id: consumer_group_name,
members: [
%{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
}
]
}
]
}
end
)
patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
{
:ok,
[
%{
group_id: consumer_group_name,
members: [
%{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
}
]
}
]
}
end)
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
@@ -147,13 +132,9 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
patch(
KafkaWrapper,
:fetch_committed_offsets,
fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end
)
patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end)
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)