Compare commits

..

10 Commits

Author SHA1 Message Date
29c032cb16 Add license 2024-06-01 12:02:18 +02:00
a34e91770f Simplify test code 2024-04-16 22:33:28 +02:00
98e79b927d Update docs to include configurable interval 2024-04-16 22:21:46 +02:00
9e9676d4e4 Remove code for Tilt cluster 2024-04-16 22:16:44 +02:00
bf8389b4e0 Simplify code 2024-04-16 22:11:50 +02:00
ee5b69f646 Make update interval configurable 2024-04-16 22:08:52 +02:00
7104414bc1 Fix network issues in Docker 2024-04-16 08:11:01 +02:00
d580b0f1d0 Update console container 2024-04-16 08:10:46 +02:00
0312a1e37c Remove commented code 2024-04-16 08:10:27 +02:00
c98e8da2ec Format README 2024-04-16 08:09:41 +02:00
26 changed files with 95 additions and 473 deletions

View File

@@ -1,4 +1,2 @@
elixir 1.16.2-otp-26 elixir 1.16.2-otp-26
erlang 26.0.2 erlang 26.0.2
tilt 0.33.11
kind 0.22.0

12
LICENSE Normal file
View File

@@ -0,0 +1,12 @@
ISC License
Copyright 2024 Pascal Schmid
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
OF THIS SOFTWARE.

View File

@@ -1,30 +1,43 @@
# KafkaexLagExporter # KafkaExLagExporter
This project will collect Kafka consumer lag and provide them via Prometheus. This project will collect Kafka consumer lag and provide them via Prometheus.
## Metrics ## Metrics
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner. KafkaexLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus. [Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner.
KafkaExLagExporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus.
**`kafka_consumergroup_group_topic_sum_lag`** **`kafka_consumergroup_group_topic_sum_lag`**
Labels: `cluster_name, group, topic, consumer_id, member_host` Labels: `cluster_name, group, topic, consumer_id, member_host`
The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group. The sum of the difference between the last produced offset and the last consumed offset of all partitions in this
topic for this group.
**`kafka_consumergroup_group_lag`** **`kafka_consumergroup_group_lag`**
Labels: `cluster_name, group, partition, topic, member_host, consumer_id` Labels: `cluster_name, group, partition, topic, member_host, consumer_id`
The difference between the last produced offset and the last consumed offset for this partition in this topic partition for this group. The difference between the last produced offset and the last consumed offset for this partition in this topic
partition for this group.
## Start ## Start
```bash ```bash
docker run -ti --net="host" -e KAFKA_BROKERS=localhost:9093,localhost:9094,localhost:9095 lechindianer/kafkaex_lag_exporter:0.2.0 docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -p 4000:4000 lechindianer/kafkaex_lag_exporter:0.2.0
``` ```
Now you can check the exposed metrics at [localhost:4000](localhost:4000). Now you can check the exposed metrics at [http://localhost:4000](http://localhost:4000).
## Configuration
KafkaExLagExporter uses 5 seconds as default interval to update the lags. If you want to configure it to use another
value set `KAFKA_EX_INTERVAL_MS`, i.e.
```bash
docker run -ti --net="host" -e KAFKA_BROKERS=redpanda:29092 -e KAFKA_EX_INTERVAL_MS=10000 -p 4000:4000 \
lechindianer/kafkaex_lag_exporter:0.2.0
```
## Developing ## Developing
@@ -34,14 +47,14 @@ To start the project locally:
KAFKA_BROKERS="localhost:9092" iex -S mix KAFKA_BROKERS="localhost:9092" iex -S mix
``` ```
There is also a docker-compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start There is also a Docker compose file included which will start Kafka, serve Kowl (Web UI for Kafka) and start
KafkaexLagExporter: KafkaexLagExporter:
```bash ```bash
docker-compose up --build docker compose up --build
``` ```
Kowl is served at [localhost:8080](localhost:8080). Kowl is served at [http://localhost:8080](http://localhost:8080).
### Tests ### Tests
@@ -67,4 +80,5 @@ mix dialyzer
Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter). Source is on [Gitlab](https://gitlab.com/lechindianer/kafkaex-lag-exporter).
The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me creating my first real Elixir project. Thank you! The initial project [Kafka Lag Exporter](https://github.com/seglo/kafka-lag-exporter) was a huge inspiration for me
creating my first real Elixir project. Thank you!

View File

@@ -1,3 +0,0 @@
allow_k8s_contexts('default')
include('konvert/Tiltfile')

View File

@@ -1,8 +0,0 @@
kafka:
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# server:
# listenPort: 8080

View File

@@ -32,7 +32,7 @@ services:
- redpanda_network - redpanda_network
console: console:
image: docker.redpanda.com/redpandadata/console:v2.2.4 image: docker.redpanda.com/redpandadata/console:v2.4.6
entrypoint: /bin/sh entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment: environment:
@@ -63,7 +63,6 @@ services:
image: quay.io/cloudhut/owl-shop:latest image: quay.io/cloudhut/owl-shop:latest
networks: networks:
- redpanda_network - redpanda_network
#platform: 'linux/amd64'
environment: environment:
- SHOP_KAFKA_BROKERS=redpanda:29092 - SHOP_KAFKA_BROKERS=redpanda:29092
- SHOP_KAFKA_TOPICREPLICATIONFACTOR=1 - SHOP_KAFKA_TOPICREPLICATIONFACTOR=1
@@ -78,7 +77,6 @@ services:
container_name: connect container_name: connect
networks: networks:
- redpanda_network - redpanda_network
#platform: 'linux/amd64'
depends_on: depends_on:
- redpanda - redpanda
ports: ports:
@@ -109,6 +107,8 @@ services:
- '4000:4000' - '4000:4000'
environment: environment:
- KAFKA_BROKERS=redpanda:29092 - KAFKA_BROKERS=redpanda:29092
networks:
- redpanda_network
depends_on: depends_on:
- redpanda - redpanda
restart: "unless-stopped" restart: "unless-stopped"

View File

@@ -1,15 +0,0 @@
#!/usr/bin/env bash
kind delete cluster
kind create cluster --config kind.yaml
kubectl wait -A --for=condition=ready pod --field-selector=status.phase!=Succeeded --timeout=15m
kubectl get cm -n kube-system kube-proxy -o yaml | sed 's/maxPerCore.*/maxPerCore: 0/' | kubectl apply -n kube-system -f -
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.7.1/deploy/static/provider/kind/deploy.yaml
LB_IP=$(kubectl get svc -n ingress-nginx ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "address=/kind.cluster/$LB_IP"

View File

@@ -1,24 +0,0 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f
- role: worker
# image: kindest/node:v1.26.3@sha256:61b92f38dff6ccc29969e7aa154d34e38b89443af1a2c14e6cfbd2df6419c66f

View File

@@ -1,21 +0,0 @@
k8s_yaml('./redpanda-owl-shop-default-networkpolicy.yaml')
k8s_yaml('./redpanda-persistentvolumeclaim.yaml')
k8s_yaml('./redpanda-deployment.yaml')
k8s_yaml('./redpanda-owl-shop-redpanda-network-networkpolicy.yaml')
k8s_yaml('./redpanda-service.yaml')
k8s_yaml('./connect-service.yaml')
k8s_yaml('./connect-deployment.yaml')
k8s_yaml('./owl-shop-deployment.yaml')
k8s_yaml('./console-deployment.yaml')
k8s_yaml('./console-service.yaml')
k8s_yaml('./console-ingress.yaml')
docker_build('kafka-lag-exporter', './..')
k8s_yaml('./kafka-lag-exporter-deployment.yaml')
k8s_yaml('./kafka-lag-exporter-service.yaml')
k8s_resource(workload='kafka-lag-exporter', port_forwards=4000)

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: connect
name: connect
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: connect
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: connect
spec:
containers:
- env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: redpanda:29092
- name: CONNECT_CONFIGURATION
value: |
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
group.id=connectors-cluster
offset.storage.topic=_internal_connectors_offsets
config.storage.topic=_internal_connectors_configs
status.storage.topic=_internal_connectors_status
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.flush.interval.ms=1000
producer.linger.ms=50
producer.batch.size=131072
- name: CONNECT_GC_LOG_ENABLED
value: "false"
- name: CONNECT_HEAP_OPTS
value: -Xms512M -Xmx512M
- name: CONNECT_LOG_LEVEL
value: info
image: docker.redpanda.com/redpandadata/connectors:latest
name: connect
ports:
- containerPort: 8083
hostname: connect
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: connect
name: connect
spec:
ports:
- name: "8083"
port: 8083
targetPort: 8083
selector:
io.kompose.service: connect

View File

@@ -1,48 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
io.kompose.service: console
name: console
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: console
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: console
spec:
containers:
- args:
- -c
- echo "$(CONSOLE_CONFIG_FILE)" > /tmp/config.yml; /app/console
command:
- /bin/sh
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
image: docker.redpanda.com/redpandadata/console:v2.2.4
name: console
ports:
- containerPort: 8080
restartPolicy: Always

View File

@@ -1,17 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-myserviceb
spec:
rules:
- host: console.lechindianer.hack
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: console
port:
number: 8080
ingressClassName: nginx

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: console
name: console
spec:
ports:
- name: "8080"
port: 8080
targetPort: 8080
selector:
io.kompose.service: console

View File

@@ -1,26 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: kafka-lag-exporter
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-default: "true"
io.kompose.service: kafka-lag-exporter
spec:
containers:
- env:
- name: KAFKA_BROKERS
value: redpanda:29092
image: kafka-lag-exporter
name: kafka-lag-exporter
ports:
- containerPort: 4000
restartPolicy: Always

View File

@@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: kafka-lag-exporter
name: kafka-lag-exporter
spec:
ports:
- name: "4000"
port: 4000
targetPort: 4000
selector:
io.kompose.service: kafka-lag-exporter

View File

@@ -1,31 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: owl-shop
name: owl-shop
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: owl-shop
template:
metadata:
creationTimestamp: null
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: owl-shop
spec:
containers:
- env:
- name: SHOP_KAFKA_BROKERS
value: redpanda:29092
- name: SHOP_KAFKA_TOPICREPLICATIONFACTOR
value: "1"
- name: SHOP_TRAFFIC_INTERVAL_DURATION
value: 0.1s
- name: SHOP_TRAFFIC_INTERVAL_RATE
value: "6"
image: quay.io/cloudhut/owl-shop:latest
name: owl-shop
restartPolicy: Always

View File

@@ -1,44 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: redpanda
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
io.kompose.service: redpanda
spec:
containers:
- args:
- redpanda start
- --smp 1
- --overprovisioned
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
name: redpanda
ports:
- containerPort: 8081
- containerPort: 8082
- containerPort: 9092
- containerPort: 9644
- containerPort: 29092
volumeMounts:
- mountPath: /var/lib/redpanda/data
name: redpanda
restartPolicy: Always
volumes:
- name: redpanda
persistentVolumeClaim:
claimName: redpanda

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-default
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-default: "true"

View File

@@ -1,13 +0,0 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: redpanda-owl-shop-redpanda-network
spec:
ingress:
- from:
- podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"
podSelector:
matchLabels:
io.kompose.network/redpanda-owl-shop-redpanda-network: "true"

View File

@@ -1,12 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi

View File

@@ -1,25 +0,0 @@
apiVersion: v1
kind: Service
metadata:
labels:
io.kompose.service: redpanda
name: redpanda
spec:
ports:
- name: "8081"
port: 8081
targetPort: 8081
- name: "8082"
port: 8082
targetPort: 8082
- name: "9092"
port: 9092
targetPort: 9092
- name: "9644"
port: 9644
targetPort: 9644
- name: "29092"
port: 29092
targetPort: 29092
selector:
io.kompose.service: redpanda

View File

@@ -5,8 +5,6 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
require Logger require Logger
@interval 5_000
def start_link(default) when is_list(default) do def start_link(default) when is_list(default) do
GenServer.start_link(__MODULE__, default, name: __MODULE__) GenServer.start_link(__MODULE__, default, name: __MODULE__)
end end
@@ -18,23 +16,29 @@ defmodule KafkaexLagExporter.ConsumerOffsetRunner do
clients = Application.get_env(:brod, :clients) clients = Application.get_env(:brod, :clients)
endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}] endpoints = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]
interval =
System.get_env("KAFKA_EX_INTERVAL_MS", "5000")
|> String.to_integer()
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}") Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
Logger.info("Updating lag information every #{interval} milliseconds")
Process.send_after(self(), :tick, @interval) Process.send_after(self(), :tick, interval)
{:ok, %{endpoints: endpoints}} {:ok, %{endpoints: endpoints, interval: interval}}
end end
@impl true @impl true
def handle_info(:tick, state) do def handle_info(:tick, state) do
[endpoint | _] = state.endpoints [endpoint | _] = state.endpoints
interval = state.interval
%{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint) %{lags: lags, sum: lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags) KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum) KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
Process.send_after(self(), :tick, @interval) Process.send_after(self(), :tick, interval)
{:noreply, state} {:noreply, state}
end end

View File

@@ -7,8 +7,6 @@ defmodule KafkaexLagExporter.KafkaUtils do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
require Logger
@default_client :client1 @default_client :client1
def connection, do: connection(@default_client) def connection, do: connection(@default_client)
@@ -46,10 +44,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
group_descriptions group_descriptions
|> Enum.flat_map(fn %{group_id: consumer_group, members: members} -> |> Enum.flat_map(fn %{group_id: consumer_group, members: members} ->
get_member_info(members) get_member_info(consumer_group, members)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
end) end)
end end
@@ -63,7 +58,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
fetch_committed_offsets(topic, consumer_group, client) fetch_committed_offsets(topic, consumer_group, client)
|> Enum.sort_by(fn {key, _value} -> key end) |> Enum.sort_by(fn {key, _value} -> key end)
for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, committed_offsets) do for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
{part, current - committed} {part, current - committed}
end end
end end
@@ -75,8 +70,7 @@ defmodule KafkaexLagExporter.KafkaUtils do
{:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic) {:ok, partitions_count} = KafkaWrapper.get_partitions_count(client, topic)
for i <- Range.new(0, partitions_count - 1), for i <- Range.new(0, partitions_count - 1),
{:ok, offset} = {:ok, offset} = KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
KafkaWrapper.resolve_offset(endpoints, topic, i, :latest, sock_opts) do
{i, offset} {i, offset}
end end
end end
@@ -88,23 +82,31 @@ defmodule KafkaexLagExporter.KafkaUtils do
{:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group) {:ok, response} = KafkaWrapper.fetch_committed_offsets(endpoints, sock_opts, consumer_group)
for r <- response, for r <- response, pr <- r[:partitions] do
pr <- r[:partitions], {pr[:partition_index], pr[:committed_offset]}
do: {pr[:partition_index], pr[:committed_offset]} end
end end
@spec get_member_info( @spec get_member_info(
binary,
list(%{client_host: binary, member_assignment: binary, member_id: binary}) list(%{client_host: binary, member_assignment: binary, member_id: binary})
) :: ) ::
list({topic_names :: list(binary), consumer_id :: binary, member_host :: binary}) list(
defp get_member_info(members) do {consumer_group :: binary, topics :: list(binary), consumer_id :: binary,
Enum.map(members, fn %{ member_host :: binary}
client_host: member_host, )
member_assignment: member_assignment, defp get_member_info(consumer_group, members) do
member_id: consumer_id members
} -> |> Enum.map(fn %{
client_host: member_host,
member_assignment: member_assignment,
member_id: consumer_id
} ->
topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment) topics = KafkaexLagExporter.TopicNameParser.parse_topic_names(member_assignment)
{topics, consumer_id, member_host} {topics, consumer_id, member_host}
end) end)
|> Enum.map(fn {topics, consumer_id, member_host} ->
{consumer_group, topics, consumer_id, member_host}
end)
end end
end end

View File

@@ -40,9 +40,7 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
end end
test "should return the calculated lags" do test "should return the calculated lags" do
test_endpoint = {"test endpoint", 666} %{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get({"test endpoint", 666})
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
assert sum == [ assert sum == [
%ConsumerOffset{ %ConsumerOffset{

View File

@@ -5,9 +5,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper alias KafkaexLagExporter.KafkaWrapper.Behaviour, as: KafkaWrapper
alias KafkaexLagExporter.KafkaUtils alias KafkaexLagExporter.KafkaUtils
@test_host "test_host" @test_endpoint {"test_host", "1234"}
@test_port "1234"
@test_endpoint {@test_host, @test_port}
@test_sock_opts [ssl: []] @test_sock_opts [ssl: []]
@test_group_name1 "test-consumer_group1" @test_group_name1 "test-consumer_group1"
@test_group_name2 "test-consumer_group" @test_group_name2 "test-consumer_group"
@@ -28,18 +26,9 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
consumer_info1 = {:a, @test_group_name1, "consumer"} consumer_info1 = {:a, @test_group_name1, "consumer"}
consumer_info2 = {:b, @test_group_name2, "something-other"} consumer_info2 = {:b, @test_group_name2, "something-other"}
patch( patch(KafkaWrapper, :list_all_groups, fn _, _ ->
KafkaWrapper, [{@test_endpoint, [consumer_info1, consumer_info2]}]
:list_all_groups, end)
fn _, _ ->
[
{
@test_endpoint,
[consumer_info1, consumer_info2]
}
]
end
)
group_names = KafkaUtils.get_consumer_group_names(@test_endpoint) group_names = KafkaUtils.get_consumer_group_names(@test_endpoint)
@@ -63,27 +52,23 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
consumer_id = "test_consumer_id" consumer_id = "test_consumer_id"
member_host = "test_member_host" member_host = "test_member_host"
patch( patch(KafkaWrapper, :describe_groups, fn _, _, _ ->
KafkaWrapper, {
:describe_groups, :ok,
fn _, _, _ -> [
{ %{
:ok, group_id: consumer_group_name,
[ members: [
%{ %{
group_id: consumer_group_name, client_host: member_host,
members: [ member_assignment: member_assignment,
%{ member_id: consumer_id
client_host: member_host, }
member_assignment: member_assignment, ]
member_id: consumer_id }
} ]
] }
} end)
]
}
end
)
patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end) patch(KafkaexLagExporter.TopicNameParser, :parse_topic_names, fn _ -> [@test_topic_name] end)
@@ -147,13 +132,9 @@ defmodule KafkaexLagExporter.KafkaUtils.Test do
patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end) patch(KafkaWrapper, :resolve_offset, fn _, _, _, _, _ -> {:ok, resolved_offset} end)
patch( patch(KafkaWrapper, :fetch_committed_offsets, fn _, _, _ ->
KafkaWrapper, {:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
:fetch_committed_offsets, end)
fn _, _, _ ->
{:ok, [%{name: "test name", partitions: [partition_info1, partition_info2]}]}
end
)
lag = KafkaUtils.lag(@test_topic_name, consumer_group, client) lag = KafkaUtils.lag(@test_topic_name, consumer_group, client)