Previously on Elixir Basics, we explored using GenServer to set up multiple workers that would print a message out at a random interval. Today, I’m going to expand on it by adding AMQP to the mix, having our workers publish messages on an interval and add a solitary consumer process to consume the messages.

Overview

If you recall from last time, we had a simple setup of an application, a supervisor, and N workers. Now we expand on this by associating a publisher with each worker that will publish messages to a RabbitMQ exchange and a single consumer that reads the messages off the queue and prints the contents out. Here is a simple diagram of our approach.

Pre-Requisites

For this exercise, we’ll need to add a few new artifacts to our project. First up, let’s add RabbitMQ via docker-compose.

services:
  rabbitmq:
    image: rabbitmq:management
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    networks:
      - amqp_network
    restart: unless-stopped

networks:
  amqp_network:
    driver: bridge

This will set up RabbitMQ with the management console running at https://localhost:15672.

We will also add the amqp dependency to our mix.exs as well as jason for JSON serialization (not technically required, but useful for demonstration purposes).

  defp deps do
    [
      {:uuid, "~> 1.1"},
      {:amqp, "~> 3.3"},
      {:jason, "~> 1.4"}
    ]
  end

Adapting Our Interval Workers

The workers themselves will remain mostly unchanged; the key addition is a new module named HelloSupervisor.Publisher to encapsulate all of our AMQP publisher logic.

# lib/hello_supervisor/worker.ex
defmodule HelloSupervisor.Worker do
  use GenServer
  require Logger
  alias HelloSupervisor.Publisher
  @min_interval 500
  @max_interval 5_000

  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @impl true
  def init(:ok) do
    {:ok, publisher} = HelloSupervisor.Publisher.start_link()

    schedule_work()
    {:ok, %{publisher: publisher}}
  end

  @impl true
  def handle_info(:say_hello, state) do
    Logger.info("Hello from process #{inspect(self())}")
    message = %{id: UUID.uuid4(), worker_id: inspect(self())}
    Publisher.publish(state.publisher, message)
    schedule_work()
    {:noreply, state}
  end

  def schedule_work() do
    Process.send_after(self(), :say_hello, random_interval(@max_interval))
  end

  defp random_interval(max) do
    :rand.uniform(max - @min_interval) + @min_interval
  end
end

This implementation is a bit naive in the sense we spawn a publisher per worker during initialization. If we wanted all five workers to use the same publisher process, we could push the start_link call further up the supervision tree.

The implementation of handle_info/2 changes in that we create a map of a random UUID and the worker process id, then pass it along to Publisher.publish/2 to handle actually publishing the message.

Publisher Implementation

For the sake of brevity, this implementation takes some shortcuts that convey some poor design choices. Exchanges, queues, and bindings are all created within the module, and there is a lack of proper error handling. We implement this module as another GenServer with both client and server callbacks.

defmodule HelloSupervisor.Publisher do
  use GenServer

  # Client API

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  def publish(server, message) when is_map(message) do
    GenServer.call(server, {:publish, message})
  end

  # Server Callbacks

  @impl true
  def init(:ok) do
    case setup_rabbitmq() do
      {:ok, conn, chan} ->
        {:ok, %{conn: conn, chan: chan}}
      {:error, reason} ->
        {:stop, reason}
    end
  end

  @impl true
  def handle_call({:publish, message}, _from, state) do
    message_with_pid = Map.put(message, :process_id, inspect(self()))
    json_message = Jason.encode!(message_with_pid)

    result = AMQP.Basic.publish(
      state.chan,
      "test_exchange",
      "",
      json_message
    )

    {:reply, result, state}
  end

  @impl true
  def terminate(_reason, state) do
    AMQP.Connection.close(state.conn)
  end

  defp setup_rabbitmq do
    with {:ok, conn} <- AMQP.Connection.open(),
         {:ok, chan} <- AMQP.Channel.open(conn) do
      AMQP.Queue.declare(chan, "test_queue")
      AMQP.Exchange.declare(chan, "test_exchange")
      AMQP.Queue.bind(chan, "test_queue", "test_exchange")
      {:ok, conn, chan}
    end
  end
end

This tacks on the process id of the publisher as well so we can see the pairings of both worker and publisher processes in the messages that are published.

At this point, if we run iex -S mix we will see the same output as last time, but if we navigate to the RabbitMQ management console we will see the test_queue beginning to fill up. If we inspect a few of those messages, we should see them in the expected format.

Two AMQP messages, containing a uuid for the id, worker_id and process_id

Something of note is that for each message, the publisher process id is sequential with the worker process id. This makes sense as each worker spawns a new process for the publisher during initialization.

Adding a Consumer

To tie it all together, let’s add a consumer to the mix to consume the queue and print the contents of each message to the console. The amqp hex documentation gives a pretty comprehensive example of implementing a Consumer GenServer, and our implementation will basically follow this example.

# lib/hello_supervisor/consumer.ex

defmodule HelloSupervisor.Consumer do
  use GenServer
  require Logger

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @impl true
  def init(:ok) do
    send(self(), :connect)
    {:ok, %{conn: nil, chan: nil}}
  end

  @impl true
  def handle_info(:connect, _state) do
    case connect() do
      {:ok, conn, chan} ->
        Process.monitor(conn.pid)
        {:noreply, %{conn: conn, chan: chan}}
      {:error, _} ->
        Logger.error("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
        Process.send_after(self(), :connect, 5000)
        {:noreply, %{conn: nil, chan: nil}}
    end
  end

  def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, chan) do
    {:noreply, chan}
  end

  @impl true
  def handle_info({:DOWN, _, :process, _pid, reason}, _) do
    Logger.error("RabbitMQ connection lost: #{inspect(reason)}. Reconnecting...")
    send(self(), :connect)
    {:noreply, %{conn: nil, chan: nil}}
  end

  def handle_info({:basic_deliver, payload, %{delivery_tag: _tag, redelivered: _redelivered}}, chan) do
    # You might want to run payload consumption in separate Tasks in production
    Logger.info("Received message: #{payload}")
    {:noreply, chan}
  end

  def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, chan) do
    {:stop, :normal, chan}
  end

  # Confirmation sent by the broker to the consumer process after a Basic.cancel
  def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, chan) do
    {:noreply, chan}
  end

  defp connect do
    case AMQP.Connection.open() do
      {:ok, conn} ->
        case AMQP.Channel.open(conn) do
          {:ok, chan} ->
            AMQP.Queue.declare(chan, "test_queue")
            AMQP.Basic.consume(chan, "test_queue", nil, no_ack: true)
            {:ok, conn, chan}
          {:error, _} = error -> error
        end
      {:error, _} = error -> error
    end
  end

  @impl true
  def terminate(_reason, %{conn: conn}) do
    if conn, do: AMQP.Connection.close(conn)
  end
end

Finally, for the last portion, we need to add this to the supervisor tree. The naive solution would be to add it to our existing supervisor where we spawn our publishers.

# lib/hello_supervisor/supervisor.ex
defmodule HelloSupervisor.Supervisor do
  use Supervisor
  require UUID

  def start_link(_args) do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    publishers = Enum.map(1..5, fn _ ->
      %{
        id: UUID.uuid4(:default),
        start: {HelloSupervisor.Worker, :start_link, [[]]},
        restart: :permanent
      }
    end)

    consumers = [
      HelloSupervisor.Consumer,
    ]

    Supervisor.init(publishers ++ consumers, strategy: :one_for_one)
  end
end

Now when we start the program, we should see both publish and consumption messages being logged out to the console.

If we go back to our RabbitMQ console, we will see on the connections panel that there are six active connections (five workers and one consumer). Can you guess which one is the consumer?

For Next Time

I took a lot of shortcuts in this demonstration to keep things clean and simple. There are several defects lurking that may be a good exercise for the reader. Next time, I will clean this code up a bit and use the :runtime_tools to dig into the process tree and ensure some better fault tolerance is in place.

As always, you can find the code for this blog post under my Elixir Learning github repository. Have you worked with any eventing systems in Elixir, or perhaps even used more higher level frameworks like Broadway or Oban? Sound off below!

comments powered by Disqus