Replace kafka_ex with brod
This commit is contained in:
@@ -7,17 +7,6 @@ defmodule KafkaexLagExporter.Application do
|
||||
|
||||
@impl true
|
||||
def start(_type, _args) do
|
||||
consumer_group_opts = [
|
||||
# setting for the ConsumerGroup
|
||||
heartbeat_interval: 1_000,
|
||||
# this setting will be forwarded to the GenConsumer
|
||||
commit_interval: 1_000
|
||||
]
|
||||
|
||||
gen_consumer_impl = KafkaexLagExporter.ConsumerOffsetsGenConsumer
|
||||
consumer_group_name = "offsets_group"
|
||||
topic_names = ["__consumer_offsets"]
|
||||
|
||||
children = [
|
||||
KafkaexLagExporter.PromEx,
|
||||
# Start the Telemetry supervisor
|
||||
@@ -26,14 +15,7 @@ defmodule KafkaexLagExporter.Application do
|
||||
{Phoenix.PubSub, name: KafkaexLagExporter.PubSub},
|
||||
# Start the Endpoint (http/https)
|
||||
KafkaexLagExporterWeb.Endpoint,
|
||||
# Start a worker by calling: KafkaexLagExporter.Worker.start_link(arg)
|
||||
# {KafkaexLagExporter.Worker, arg}
|
||||
%{
|
||||
id: KafkaEx.ConsumerGroup,
|
||||
start:
|
||||
{KafkaEx.ConsumerGroup, :start_link,
|
||||
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]}
|
||||
}
|
||||
KafkaexLagExporter.ConsumerOffsetFetcher
|
||||
]
|
||||
|
||||
# See https://hexdocs.pm/elixir/Supervisor.html
|
||||
|
||||
38
lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex
Normal file
38
lib/kafkaex_lag_exporter/consumer_offset_fetcher.ex
Normal file
@@ -0,0 +1,38 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
||||
@moduledoc """
|
||||
Genserver implementation to consume new messages on topic '__consumer_offsets'
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
@interval 5_000
|
||||
|
||||
def start_link(default) when is_list(default) do
|
||||
GenServer.start_link(__MODULE__, default)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_) do
|
||||
Logger.info("Starting #{__MODULE__}")
|
||||
|
||||
clients = Application.get_env(:brod, :clients)
|
||||
endpoints = clients[:kafka_client][:endpoints]
|
||||
|
||||
Logger.info("Reveived Kafka endpoints: #{inspect(endpoints)}")
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
{:ok, %{endpoints: endpoints}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
consumer_groups = :brod.list_all_groups(state.endpoints, [])
|
||||
Logger.info("Consumer groups state: #{inspect(consumer_groups)}")
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
@@ -1,52 +0,0 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffsetsGenConsumer do
|
||||
@moduledoc """
|
||||
Genserver implementation to consume new messages on topic '__consumer_offsets'
|
||||
"""
|
||||
|
||||
use KafkaEx.GenConsumer
|
||||
|
||||
alias KafkaEx.Protocol.Fetch.Message
|
||||
|
||||
require Logger
|
||||
|
||||
def init(_topic, _partition, _extra_args) do
|
||||
{:ok, %{}}
|
||||
end
|
||||
|
||||
def get() do
|
||||
GenServer.cast(__MODULE__, {:get})
|
||||
end
|
||||
|
||||
def handle_call({:get}, _from, state) do
|
||||
{:reply, state}
|
||||
end
|
||||
|
||||
def handle_call({:push, topic, offset}, _from, state) do
|
||||
new_state = Map.put(state, topic, offset)
|
||||
|
||||
# IO.puts "new state"
|
||||
# IO.inspect new_state
|
||||
|
||||
{:reply, new_state}
|
||||
end
|
||||
|
||||
def handle_message_set(message_set, state) do
|
||||
for %Message{key: key, offset: offset} <- message_set do
|
||||
consumer_group = get_consumer_group(key)
|
||||
|
||||
Logger.info("consumer_group '#{consumer_group}' has offset '#{offset}'}")
|
||||
|
||||
# GenServer.call(__MODULE__, {:push, consumer_group, offset})
|
||||
end
|
||||
|
||||
{:async_commit, state}
|
||||
end
|
||||
|
||||
defp get_consumer_group(<<prefix, version, postfix::binary-size(2), consumer_group::binary>>) do
|
||||
Logger.debug(fn -> "prefix: " <> inspect(prefix) end)
|
||||
Logger.debug(fn -> "version: " <> inspect(version) end)
|
||||
Logger.debug(fn -> "postfix: " <> inspect(postfix) end)
|
||||
|
||||
consumer_group
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user