diff --git a/lib/consumer_offsets_consumer_group.ex b/lib/consumer_offsets_consumer_group.ex new file mode 100644 index 0000000..11f3cae --- /dev/null +++ b/lib/consumer_offsets_consumer_group.ex @@ -0,0 +1,26 @@ +defmodule ConsumerOffsetsGenConsumer do + use KafkaEx.GenConsumer + + alias KafkaEx.Protocol.Fetch.Message + + require Logger + + def handle_message_set(message_set, state) do + for %Message{key: key, offset: offset} <- message_set do + Logger.info(fn -> "offset: " <> inspect(offset) end) + + consumer_group = get_consumer_group(key) + end + + {:async_commit, state} + end + + defp get_consumer_group(<>) do + Logger.debug(fn -> "prefix: " <> inspect(prefix) end) + Logger.debug(fn -> "version: " <> inspect(version) end) + Logger.debug(fn -> "postfix: " <> inspect(postfix) end) + + consumer_group + end + +end diff --git a/lib/kafkaex_lag_exporter.ex b/lib/kafkaex_lag_exporter.ex index c14cdf0..b2b2d6b 100644 --- a/lib/kafkaex_lag_exporter.ex +++ b/lib/kafkaex_lag_exporter.ex @@ -1,18 +1,27 @@ defmodule KafkaexLagExporter do - @moduledoc """ - Documentation for `KafkaexLagExporter`. - """ + use Application - @doc """ - Hello world. + def start(_type, _args) do + import Supervisor.Spec - ## Examples + consumer_group_opts = [ + # setting for the ConsumerGroup + heartbeat_interval: 1_000, + # this setting will be forwarded to the GenConsumer + commit_interval: 1_000 + ] - iex> KafkaexLagExporter.hello() - :world + gen_consumer_impl = ConsumerOffsetsGenConsumer + consumer_group_name = "offsets_group" + topic_names = ["__consumer_offsets"] - """ - def hello do - :world + children = [ + supervisor( + KafkaEx.ConsumerGroup, + [gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts] + ) + ] + + Supervisor.start_link(children, strategy: :one_for_one) end end diff --git a/mix.exs b/mix.exs index 1ac476a..08b2935 100644 --- a/mix.exs +++ b/mix.exs @@ -14,15 +14,14 @@ defmodule KafkaexLagExporter.MixProject do # Run "mix help compile.app" to learn about applications. def application do [ - extra_applications: [:logger] + extra_applications: [:logger], + mod: { KafkaexLagExporter, [] }, ] end # Run "mix help deps" to learn about dependencies. defp deps do [ - # {:dep_from_hexpm, "~> 0.3.0"}, - # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} {:kafka_ex, "~> 0.12.1"}, ] end