Replace Mox with Patch
This commit is contained in:
@@ -19,7 +19,7 @@ defmodule KafkaexLagExporter.KafkaUtils.Behaviour do
|
||||
{host :: atom, port :: non_neg_integer},
|
||||
list(binary),
|
||||
list(binary)
|
||||
) :: list(binary)
|
||||
) :: list({consumer_group :: binary, topics :: list(binary)})
|
||||
|
||||
def connection(client), do: impl().connection(client)
|
||||
|
||||
|
||||
@@ -29,10 +29,10 @@ defmodule KafkaexLagExporter.ConsumerOffset do
|
||||
def handle_info(:tick, state) do
|
||||
[endpoint | _] = state.endpoints
|
||||
|
||||
{consumer_lags, consumer_lag_sum} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
%{sum: lag_sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(endpoint)
|
||||
|
||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, consumer_lags)
|
||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, consumer_lag_sum)
|
||||
KafkaexLagExporter.Metrics.group_lag_per_partition(endpoint, lags)
|
||||
KafkaexLagExporter.Metrics.group_sum_lag(endpoint, lag_sum)
|
||||
|
||||
Process.send_after(self(), :tick, @interval)
|
||||
|
||||
|
||||
@@ -3,19 +3,24 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
||||
|
||||
require Logger
|
||||
|
||||
# TODO: change return type
|
||||
@spec get(KafkaexLagExporter.KafkaWrapper.endpoint()) :: {any(), any()}
|
||||
alias KafkaexLagExporter.KafkaUtils
|
||||
|
||||
# TODO fix type
|
||||
@spec get(KafkaexLagExporter.KafkaWrapper.Behaviour.endpoint()) :: %{
|
||||
lags: list(binary),
|
||||
sum: list(binary)
|
||||
}
|
||||
def get(endpoint) do
|
||||
consumer_group_names = KafkaexLagExporter.KafkaUtils.get_consumer_group_names(endpoint)
|
||||
consumer_group_names = KafkaUtils.get_consumer_group_names(endpoint)
|
||||
|
||||
consumer_lags =
|
||||
KafkaexLagExporter.KafkaUtils.topic_names_for_consumer_groups(
|
||||
KafkaUtils.topic_names_for_consumer_groups(
|
||||
endpoint,
|
||||
[],
|
||||
consumer_group_names
|
||||
)
|
||||
|> Enum.map(fn [consumer_group, topics] ->
|
||||
[consumer_group, get_lag_for_consumer(consumer_group, topics)]
|
||||
|> Enum.map(fn {consumer_group, topics} ->
|
||||
{consumer_group, get_lag_for_consumer(consumer_group, topics)}
|
||||
end)
|
||||
|
||||
consumer_lag_sum = get_lag_for_consumer_sum(consumer_lags)
|
||||
@@ -26,13 +31,13 @@ defmodule KafkaexLagExporter.ConsumerOffsetFetcher do
|
||||
defp get_lag_for_consumer(consumer_group, topics) do
|
||||
topics
|
||||
|> Enum.flat_map(fn topic ->
|
||||
KafkaexLagExporter.KafkaUtils.lag(topic, consumer_group, :client1)
|
||||
KafkaUtils.lag(topic, consumer_group, :client1)
|
||||
end)
|
||||
end
|
||||
|
||||
defp get_lag_for_consumer_sum(lags_per_consumer_group) do
|
||||
lags_per_consumer_group
|
||||
|> Enum.map(fn [topic, lag_per_partition] -> [topic, sum_topic_lag(lag_per_partition, 0)] end)
|
||||
|> Enum.map(fn {topic, lag_per_partition} -> {topic, sum_topic_lag(lag_per_partition, 0)} end)
|
||||
end
|
||||
|
||||
defp sum_topic_lag([], acc), do: acc
|
||||
|
||||
2
mix.exs
2
mix.exs
@@ -39,7 +39,7 @@ defmodule KafkaexLagExporter.MixProject do
|
||||
{:plug_cowboy, "~> 2.6.1"},
|
||||
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
|
||||
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
|
||||
{:mox, "~> 1.1", only: :test},
|
||||
{:patch, "~> 0.13.0", only: :test},
|
||||
{:brod, "~> 3.17.0"},
|
||||
{:prom_ex, "~> 1.8.0"},
|
||||
{:telemetry, "~> 1.2"}
|
||||
|
||||
2
mix.lock
2
mix.lock
@@ -16,10 +16,10 @@
|
||||
"kafka_protocol": {:hex, :kafka_protocol, "4.1.5", "d15e64994a8ca99716ab47db4132614359ac1bfa56d6c5b4341fdc1aa4041518", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "c956c9357fef493b7072a35d0c3e2be02aa5186c804a412d29e62423bb15e5d9"},
|
||||
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
|
||||
"mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"},
|
||||
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
|
||||
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
|
||||
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
|
||||
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
|
||||
"patch": {:hex, :patch, "0.13.0", "da48728f9086a835956200a671210fe88f67ff48bb1f92626989886493ac2081", [:mix], [], "hexpm", "d65a840d485dfa05bf6673269b56680e7537a05050684e713de125a351b28112"},
|
||||
"phoenix": {:hex, :phoenix, "1.6.16", "e5bdd18c7a06da5852a25c7befb72246de4ddc289182285f8685a40b7b5f5451", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 1.0 or ~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e15989ff34f670a96b95ef6d1d25bad0d9c50df5df40b671d8f4a669e050ac39"},
|
||||
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"},
|
||||
"phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"},
|
||||
|
||||
47
test/consumer_offset_fetcher_test.exs
Normal file
47
test/consumer_offset_fetcher_test.exs
Normal file
@@ -0,0 +1,47 @@
|
||||
defmodule KafkaexLagExporter.ConsumerOffsetFetcher.Test do
|
||||
use ExUnit.Case, async: true
|
||||
use Patch
|
||||
|
||||
@test_consumer_group_name1 "test_consumer_1"
|
||||
@test_consumer_group_name2 "test_consumer_2"
|
||||
@test_lags [{0, 23}, {1, 42}, {2, 666}]
|
||||
|
||||
setup do
|
||||
patch(
|
||||
KafkaexLagExporter.KafkaUtils,
|
||||
:get_consumer_group_names,
|
||||
fn _ -> [@test_consumer_group_name1, @test_consumer_group_name2] end
|
||||
)
|
||||
|
||||
patch(KafkaexLagExporter.KafkaUtils, :lag, fn _, _, _ -> @test_lags end)
|
||||
|
||||
patch(
|
||||
KafkaexLagExporter.KafkaUtils,
|
||||
:topic_names_for_consumer_groups,
|
||||
fn _, _, _ ->
|
||||
[
|
||||
{@test_consumer_group_name1, ["test_topic_1", "test_topic_2"]},
|
||||
{@test_consumer_group_name2, ["test_topic_3"]}
|
||||
]
|
||||
end
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "should return the calculated lags" do
|
||||
test_endpoint = {"test endpoint", 666}
|
||||
|
||||
%{sum: sum, lags: lags} = KafkaexLagExporter.ConsumerOffsetFetcher.get(test_endpoint)
|
||||
|
||||
assert sum == [
|
||||
{@test_consumer_group_name1, 1462},
|
||||
{@test_consumer_group_name2, 731}
|
||||
]
|
||||
|
||||
assert lags == [
|
||||
{@test_consumer_group_name1, @test_lags ++ @test_lags},
|
||||
{@test_consumer_group_name2, @test_lags}
|
||||
]
|
||||
end
|
||||
end
|
||||
@@ -1,8 +0,0 @@
|
||||
defmodule KafkaexLagExporterTest do
|
||||
use ExUnit.Case
|
||||
doctest KafkaexLagExporter
|
||||
|
||||
test "greets the world" do
|
||||
assert KafkaexLagExporter.hello() == :world
|
||||
end
|
||||
end
|
||||
@@ -1,17 +0,0 @@
|
||||
defmodule KafkaexLagExporterWeb.ErrorViewTest do
|
||||
use KafkaexLagExporterWeb.ConnCase, async: true
|
||||
|
||||
# Bring render/3 and render_to_string/3 for testing custom views
|
||||
import Phoenix.View
|
||||
|
||||
test "renders 404.json" do
|
||||
assert render(KafkaexLagExporterWeb.ErrorView, "404.json", []) == %{
|
||||
errors: %{detail: "Not Found"}
|
||||
}
|
||||
end
|
||||
|
||||
test "renders 500.json" do
|
||||
assert render(KafkaexLagExporterWeb.ErrorView, "500.json", []) ==
|
||||
%{errors: %{detail: "Internal Server Error"}}
|
||||
end
|
||||
end
|
||||
@@ -1,34 +0,0 @@
|
||||
defmodule KafkaexLagExporterWeb.ChannelCase do
|
||||
@moduledoc """
|
||||
This module defines the test case to be used by
|
||||
channel tests.
|
||||
|
||||
Such tests rely on `Phoenix.ChannelTest` and also
|
||||
import other functionality to make it easier
|
||||
to build common data structures and query the data layer.
|
||||
|
||||
Finally, if the test case interacts with the database,
|
||||
we enable the SQL sandbox, so changes done to the database
|
||||
are reverted at the end of every test. If you are using
|
||||
PostgreSQL, you can even run database tests asynchronously
|
||||
by setting `use KafkaexLagExporterWeb.ChannelCase, async: true`, although
|
||||
this option is not recommended for other databases.
|
||||
"""
|
||||
|
||||
use ExUnit.CaseTemplate
|
||||
|
||||
using do
|
||||
quote do
|
||||
# Import conveniences for testing with channels
|
||||
import Phoenix.ChannelTest
|
||||
import KafkaexLagExporterWeb.ChannelCase
|
||||
|
||||
# The default endpoint for testing
|
||||
@endpoint KafkaexLagExporterWeb.Endpoint
|
||||
end
|
||||
end
|
||||
|
||||
setup _tags do
|
||||
:ok
|
||||
end
|
||||
end
|
||||
@@ -1,37 +0,0 @@
|
||||
defmodule KafkaexLagExporterWeb.ConnCase do
|
||||
@moduledoc """
|
||||
This module defines the test case to be used by
|
||||
tests that require setting up a connection.
|
||||
|
||||
Such tests rely on `Phoenix.ConnTest` and also
|
||||
import other functionality to make it easier
|
||||
to build common data structures and query the data layer.
|
||||
|
||||
Finally, if the test case interacts with the database,
|
||||
we enable the SQL sandbox, so changes done to the database
|
||||
are reverted at the end of every test. If you are using
|
||||
PostgreSQL, you can even run database tests asynchronously
|
||||
by setting `use KafkaexLagExporterWeb.ConnCase, async: true`, although
|
||||
this option is not recommended for other databases.
|
||||
"""
|
||||
|
||||
use ExUnit.CaseTemplate
|
||||
|
||||
using do
|
||||
quote do
|
||||
# Import conveniences for testing with connections
|
||||
import Plug.Conn
|
||||
import Phoenix.ConnTest
|
||||
import KafkaexLagExporterWeb.ConnCase
|
||||
|
||||
alias KafkaexLagExporterWeb.Router.Helpers, as: Routes
|
||||
|
||||
# The default endpoint for testing
|
||||
@endpoint KafkaexLagExporterWeb.Endpoint
|
||||
end
|
||||
end
|
||||
|
||||
setup _tags do
|
||||
{:ok, conn: Phoenix.ConnTest.build_conn()}
|
||||
end
|
||||
end
|
||||
@@ -1,2 +1 @@
|
||||
Application.ensure_all_started(:mox)
|
||||
ExUnit.start()
|
||||
|
||||
Reference in New Issue
Block a user