Previously on Elixir Basics, we explored using GenServer to set up multiple workers that would print a message out at a random interval. Today, I’m going to expand on it by adding AMQP to the mix, having our workers publish messages on an interval and add a solitary consumer process to consume the messages.
Overview
If you recall from last time, we had a simple setup of an application, a supervisor, and N workers. Now we expand on this by associating a publisher with each worker that will publish messages to a RabbitMQ exchange and a single consumer that reads the messages off the queue and prints the contents out. Here is a simple diagram of our approach.
Pre-Requisites
For this exercise, we’ll need to add a few new artifacts to our project. First up, let’s add RabbitMQ via docker-compose.
services:
rabbitmq:
image: rabbitmq:management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
networks:
- amqp_network
restart: unless-stopped
networks:
amqp_network:
driver: bridge
This will set up RabbitMQ with the management console running at https://localhost:15672.
We will also add the amqp dependency to our mix.exs as well as jason for JSON serialization (not technically required, but useful for demonstration purposes).
defp deps do
[
{:uuid, "~> 1.1"},
{:amqp, "~> 3.3"},
{:jason, "~> 1.4"}
]
end
Adapting Our Interval Workers
The workers themselves will remain mostly unchanged; the key addition is a new module named HelloSupervisor.Publisher
to encapsulate all of our AMQP publisher logic.
# lib/hello_supervisor/worker.ex
defmodule HelloSupervisor.Worker do
use GenServer
require Logger
alias HelloSupervisor.Publisher
@min_interval 500
@max_interval 5_000
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
{:ok, publisher} = HelloSupervisor.Publisher.start_link()
schedule_work()
{:ok, %{publisher: publisher}}
end
@impl true
def handle_info(:say_hello, state) do
Logger.info("Hello from process #{inspect(self())}")
message = %{id: UUID.uuid4(), worker_id: inspect(self())}
Publisher.publish(state.publisher, message)
schedule_work()
{:noreply, state}
end
def schedule_work() do
Process.send_after(self(), :say_hello, random_interval(@max_interval))
end
defp random_interval(max) do
:rand.uniform(max - @min_interval) + @min_interval
end
end
This implementation is a bit naive in the sense we spawn a publisher per worker during initialization. If we wanted all five workers to use the same publisher process, we could push the start_link
call further up the supervision tree.
The implementation of handle_info/2
changes in that we create a map of a random UUID and the worker process id, then pass it along to Publisher.publish/2
to handle actually publishing the message.
Publisher Implementation
For the sake of brevity, this implementation takes some shortcuts that convey some poor design choices. Exchanges, queues, and bindings are all created within the module, and there is a lack of proper error handling. We implement this module as another GenServer with both client and server callbacks.
defmodule HelloSupervisor.Publisher do
use GenServer
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def publish(server, message) when is_map(message) do
GenServer.call(server, {:publish, message})
end
# Server Callbacks
@impl true
def init(:ok) do
case setup_rabbitmq() do
{:ok, conn, chan} ->
{:ok, %{conn: conn, chan: chan}}
{:error, reason} ->
{:stop, reason}
end
end
@impl true
def handle_call({:publish, message}, _from, state) do
message_with_pid = Map.put(message, :process_id, inspect(self()))
json_message = Jason.encode!(message_with_pid)
result = AMQP.Basic.publish(
state.chan,
"test_exchange",
"",
json_message
)
{:reply, result, state}
end
@impl true
def terminate(_reason, state) do
AMQP.Connection.close(state.conn)
end
defp setup_rabbitmq do
with {:ok, conn} <- AMQP.Connection.open(),
{:ok, chan} <- AMQP.Channel.open(conn) do
AMQP.Queue.declare(chan, "test_queue")
AMQP.Exchange.declare(chan, "test_exchange")
AMQP.Queue.bind(chan, "test_queue", "test_exchange")
{:ok, conn, chan}
end
end
end
This tacks on the process id of the publisher as well so we can see the pairings of both worker and publisher processes in the messages that are published.
At this point, if we run iex -S mix
we will see the same output as last time, but if we navigate to the RabbitMQ management console we will see the test_queue
beginning to fill up. If we inspect a few of those messages, we should see them in the expected format.
Something of note is that for each message, the publisher process id is sequential with the worker process id. This makes sense as each worker spawns a new process for the publisher during initialization.
Adding a Consumer
To tie it all together, let’s add a consumer to the mix to consume the queue and print the contents of each message to the console. The amqp hex documentation gives a pretty comprehensive example of implementing a Consumer GenServer, and our implementation will basically follow this example.
# lib/hello_supervisor/consumer.ex
defmodule HelloSupervisor.Consumer do
use GenServer
require Logger
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
send(self(), :connect)
{:ok, %{conn: nil, chan: nil}}
end
@impl true
def handle_info(:connect, _state) do
case connect() do
{:ok, conn, chan} ->
Process.monitor(conn.pid)
{:noreply, %{conn: conn, chan: chan}}
{:error, _} ->
Logger.error("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
Process.send_after(self(), :connect, 5000)
{:noreply, %{conn: nil, chan: nil}}
end
end
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, chan) do
{:noreply, chan}
end
@impl true
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
Logger.error("RabbitMQ connection lost: #{inspect(reason)}. Reconnecting...")
send(self(), :connect)
{:noreply, %{conn: nil, chan: nil}}
end
def handle_info({:basic_deliver, payload, %{delivery_tag: _tag, redelivered: _redelivered}}, chan) do
# You might want to run payload consumption in separate Tasks in production
Logger.info("Received message: #{payload}")
{:noreply, chan}
end
def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, chan) do
{:stop, :normal, chan}
end
# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, chan) do
{:noreply, chan}
end
defp connect do
case AMQP.Connection.open() do
{:ok, conn} ->
case AMQP.Channel.open(conn) do
{:ok, chan} ->
AMQP.Queue.declare(chan, "test_queue")
AMQP.Basic.consume(chan, "test_queue", nil, no_ack: true)
{:ok, conn, chan}
{:error, _} = error -> error
end
{:error, _} = error -> error
end
end
@impl true
def terminate(_reason, %{conn: conn}) do
if conn, do: AMQP.Connection.close(conn)
end
end
Finally, for the last portion, we need to add this to the supervisor tree. The naive solution would be to add it to our existing supervisor where we spawn our publishers.
# lib/hello_supervisor/supervisor.ex
defmodule HelloSupervisor.Supervisor do
use Supervisor
require UUID
def start_link(_args) do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
publishers = Enum.map(1..5, fn _ ->
%{
id: UUID.uuid4(:default),
start: {HelloSupervisor.Worker, :start_link, [[]]},
restart: :permanent
}
end)
consumers = [
HelloSupervisor.Consumer,
]
Supervisor.init(publishers ++ consumers, strategy: :one_for_one)
end
end
Now when we start the program, we should see both publish and consumption messages being logged out to the console.
If we go back to our RabbitMQ console, we will see on the connections panel that there are six active connections (five workers and one consumer). Can you guess which one is the consumer?
For Next Time
I took a lot of shortcuts in this demonstration to keep things clean and simple. There are several defects lurking that may be a good exercise for the reader.
Next time, I will clean this code up a bit and use the :runtime_tools
to dig into the process tree and ensure some better fault tolerance is in place.
As always, you can find the code for this blog post under my Elixir Learning github repository. Have you worked with any eventing systems in Elixir, or perhaps even used more higher level frameworks like Broadway or Oban? Sound off below!