Provide metrics on root URL

This commit is contained in:
2022-02-20 22:47:48 +01:00
parent 79e83afafb
commit c8a043528b
23 changed files with 869 additions and 127 deletions

View File

@@ -1,32 +1,13 @@
defmodule KafkaexLagExporter do
@moduledoc """
Supervisor to start the '__consumer__offsets' watcher child
KafkaexLagExporter keeps the contexts that define your domain
and business logic.
Contexts are also responsible for managing your data, regardless
if it comes from the database, an external API or others.
"""
use Application
def start(_type, _args) do
import Supervisor.Spec
consumer_group_opts = [
# setting for the ConsumerGroup
heartbeat_interval: 1_000,
# this setting will be forwarded to the GenConsumer
commit_interval: 1_000
]
gen_consumer_impl = ConsumerOffsetsGenConsumer
consumer_group_name = "offsets_group"
topic_names = ["__consumer_offsets"]
children = [
supervisor(
KafkaEx.ConsumerGroup,
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
)
]
Supervisor.start_link(children, strategy: :one_for_one)
def hello() do
:world
end
end

View File

@@ -0,0 +1,52 @@
defmodule KafkaexLagExporter.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
consumer_group_opts = [
# setting for the ConsumerGroup
heartbeat_interval: 1_000,
# this setting will be forwarded to the GenConsumer
commit_interval: 1_000
]
gen_consumer_impl = KafkaexLagExporter.ConsumerOffsetsGenConsumer
consumer_group_name = "offsets_group"
topic_names = ["__consumer_offsets"]
children = [
KafkaexLagExporter.PromEx,
# Start the Telemetry supervisor
KafkaexLagExporterWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: KafkaexLagExporter.PubSub},
# Start the Endpoint (http/https)
KafkaexLagExporterWeb.Endpoint,
# Start a worker by calling: KafkaexLagExporter.Worker.start_link(arg)
# {KafkaexLagExporter.Worker, arg}
%{
id: KafkaEx.ConsumerGroup,
start:
{KafkaEx.ConsumerGroup, :start_link,
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]}
}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: KafkaexLagExporter.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
@impl true
def config_change(changed, _new, removed) do
KafkaexLagExporterWeb.Endpoint.config_change(changed, removed)
:ok
end
end

View File

@@ -1,5 +1,4 @@
defmodule ConsumerOffsetsGenConsumer do
defmodule KafkaexLagExporter.ConsumerOffsetsGenConsumer do
@moduledoc """
Genserver implementation to consume new messages on topic '__consumer_offsets'
"""
@@ -10,11 +9,34 @@ defmodule ConsumerOffsetsGenConsumer do
require Logger
def init(_topic, _partition, _extra_args) do
{:ok, %{}}
end
def get() do
GenServer.cast(__MODULE__, {:get})
end
def handle_call({:get}, _from, state) do
{:reply, state}
end
def handle_call({:push, topic, offset}, _from, state) do
new_state = Map.put(state, topic, offset)
# IO.puts "new state"
# IO.inspect new_state
{:reply, new_state}
end
def handle_message_set(message_set, state) do
for %Message{key: key, offset: offset} <- message_set do
consumer_group = get_consumer_group(key)
Logger.info("consumer_group '#{consumer_group}' has offset '#{offset}'}")
# GenServer.call(__MODULE__, {:push, consumer_group, offset})
end
{:async_commit, state}
@@ -27,5 +49,4 @@ defmodule ConsumerOffsetsGenConsumer do
consumer_group
end
end

View File

@@ -0,0 +1,76 @@
defmodule KafkaexLagExporter.PromEx do
@moduledoc """
Be sure to add the following to finish setting up PromEx:
1. Update your configuration (config.exs, dev.exs, prod.exs, releases.exs, etc) to
configure the necessary bit of PromEx. Be sure to check out `PromEx.Config` for
more details regarding configuring PromEx:
```
config :kafkaex_lag_exporter, KafkaexLagExporter.PromEx,
disabled: false,
manual_metrics_start_delay: :no_delay,
drop_metrics_groups: [],
grafana: :disabled,
metrics_server: :disabled
```
2. Add this module to your application supervision tree. It should be one of the first
things that is started so that no Telemetry events are missed. For example, if PromEx
is started after your Repo module, you will miss Ecto's init events and the dashboards
will be missing some data points:
```
def start(_type, _args) do
children = [
KafkaexLagExporter.PromEx,
...
]
...
end
```
3. Update your `endpoint.ex` file to expose your metrics (or configure a standalone
server using the `:metrics_server` config options). Be sure to put this plug before
your `Plug.Telemetry` entry so that you can avoid having calls to your `/metrics`
endpoint create their own metrics and logs which can pollute your logs/metrics given
that Prometheus will scrape at a regular interval and that can get noisy:
```
defmodule KafkaexLagExporterWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
...
plug PromEx.Plug, prom_ex_module: KafkaexLagExporter.PromEx
...
end
```
4. Update the list of plugins in the `plugins/0` function return list to reflect your
application's dependencies. Also update the list of dashboards that are to be uploaded
to Grafana in the `dashboards/0` function.
"""
use PromEx, otp_app: :kafkaex_lag_exporter
alias PromEx.Plugins
@impl true
def plugins do
[
# PromEx built in plugins
Plugins.Application,
Plugins.Beam
# {Plugins.Phoenix, router: KafkaexLagExporterWeb.Router, endpoint: KafkaexLagExporterWeb.Endpoint},
# Plugins.Ecto,
# Plugins.Oban,
# Plugins.PhoenixLiveView,
# Plugins.Absinthe,
# Plugins.Broadway,
# Add your own PromEx metrics plugins
# KafkaexLagExporter.Users.PromExPlugin
]
end
end

View File

@@ -0,0 +1,75 @@
defmodule KafkaexLagExporterWeb do
@moduledoc """
The entrypoint for defining your web interface, such
as controllers, views, channels and so on.
This can be used in your application as:
use KafkaexLagExporterWeb, :controller
use KafkaexLagExporterWeb, :view
The definitions below will be executed for every view,
controller, etc, so keep them short and clean, focused
on imports, uses and aliases.
Do NOT define functions inside the quoted expressions
below. Instead, define any helper function in modules
and import those modules here.
"""
def controller do
quote do
use Phoenix.Controller, namespace: KafkaexLagExporterWeb
import Plug.Conn
alias KafkaexLagExporterWeb.Router.Helpers, as: Routes
end
end
def view do
quote do
use Phoenix.View,
root: "lib/kafkaex_lag_exporter_web/templates",
namespace: KafkaexLagExporterWeb
# Import convenience functions from controllers
import Phoenix.Controller,
only: [get_flash: 1, get_flash: 2, view_module: 1, view_template: 1]
# Include shared imports and aliases for views
unquote(view_helpers())
end
end
def router do
quote do
use Phoenix.Router
import Plug.Conn
import Phoenix.Controller
end
end
def channel do
quote do
use Phoenix.Channel
end
end
defp view_helpers do
quote do
# Import basic rendering functionality (render, render_layout, etc)
import Phoenix.View
import KafkaexLagExporterWeb.ErrorHelpers
alias KafkaexLagExporterWeb.Router.Helpers, as: Routes
end
end
@doc """
When used, dispatch to the appropriate controller/view/etc.
"""
defmacro __using__(which) when is_atom(which) do
apply(__MODULE__, which, [])
end
end

View File

@@ -0,0 +1,45 @@
defmodule KafkaexLagExporterWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :kafkaex_lag_exporter
plug PromEx.Plug, path: "/", prom_ex_module: KafkaexLagExporter.PromEx
# The session will be stored in the cookie and signed,
# this means its contents can be read but not tampered with.
# Set :encryption_salt if you would also like to encrypt it.
@session_options [
store: :cookie,
key: "_kafkaex_lag_exporter_key",
signing_salt: "f/R6/xEO"
]
# socket "/live", Phoenix.LiveView.Socket, websocket: [connect_info: [session: @session_options]]
# Serve at "/" the static files from "priv/static" directory.
#
# You should set gzip to true if you are running phx.digest
# when deploying your static files in production.
plug Plug.Static,
at: "/",
from: :kafkaex_lag_exporter,
gzip: false,
only: ~w(assets fonts images favicon.ico robots.txt)
# Code reloading can be explicitly enabled under the
# :code_reloader configuration of your endpoint.
if code_reloading? do
plug Phoenix.CodeReloader
end
plug Plug.RequestId
plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
plug Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
pass: ["*/*"],
json_decoder: Phoenix.json_library()
plug Plug.MethodOverride
plug Plug.Head
plug Plug.Session, @session_options
plug KafkaexLagExporterWeb.Router
end

View File

@@ -0,0 +1,11 @@
defmodule KafkaexLagExporterWeb.Router do
use KafkaexLagExporterWeb, :router
pipeline :api do
plug :accepts, ["json"]
end
scope "/api", KafkaexLagExporterWeb do
pipe_through :api
end
end

View File

@@ -0,0 +1,50 @@
defmodule KafkaexLagExporterWeb.Telemetry do
@moduledoc false
use Supervisor
import Telemetry.Metrics
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl true
def init(_arg) do
children = [
# Telemetry poller will execute the given period measurements
# every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
# Add reporters as children of your supervision tree.
# {Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
]
Supervisor.init(children, strategy: :one_for_one)
end
def metrics do
[
# Phoenix Metrics
summary("phoenix.endpoint.stop.duration",
unit: {:native, :millisecond}
),
summary("phoenix.router_dispatch.stop.duration",
tags: [:route],
unit: {:native, :millisecond}
),
# VM Metrics
summary("vm.memory.total", unit: {:byte, :kilobyte}),
summary("vm.total_run_queue_lengths.total"),
summary("vm.total_run_queue_lengths.cpu"),
summary("vm.total_run_queue_lengths.io")
]
end
defp periodic_measurements do
[
# A module, function and arguments to be invoked periodically.
# This function must call :telemetry.execute/3 and a metric must be added above.
# {KafkaexLagExporterWeb, :count_users, []}
]
end
end

View File

@@ -0,0 +1,16 @@
defmodule KafkaexLagExporterWeb.ErrorHelpers do
@moduledoc """
Conveniences for translating and building error messages.
"""
@doc """
Translates an error message.
"""
def translate_error({msg, opts}) do
# Because the error messages we show in our forms and APIs
# are defined inside Ecto, we need to translate them dynamically.
Enum.reduce(opts, msg, fn {key, value}, acc ->
String.replace(acc, "%{#{key}}", fn _ -> to_string(value) end)
end)
end
end

View File

@@ -0,0 +1,16 @@
defmodule KafkaexLagExporterWeb.ErrorView do
use KafkaexLagExporterWeb, :view
# If you want to customize a particular status code
# for a certain format, you may uncomment below.
# def render("500.json", _assigns) do
# %{errors: %{detail: "Internal Server Error"}}
# end
# By default, Phoenix returns the status message from
# the template name. For example, "404.json" becomes
# "Not Found".
def template_not_found(template, _assigns) do
%{errors: %{detail: Phoenix.Controller.status_message_from_template(template)}}
end
end