For the last few months I’ve been working on a few Elixir applications receiving inputs from different sources: queuing systems, websockets, database notifications. In this post, we’ll look at one possible way to model the architecture of such an application.

Requirements

Our application needs to:

  • process messages whose shape can change over time, but affect the system in the same way. In other words, it has to support different versions of the same client at the same time.
  • Provide traceability for an incoming message, so that it can followed through at all stages.
  • notify different clients of certain events after processing the message.

As an example, here’s a more concrete use case:

  • The application receives an incoming RabbitMQ message: create a report with this data.
  • The application processes the message and creates the resource in the database.
  • Finally, it sends a few different notifications to another message broker (Crossbar) so that they can piped to any consumer client (browser, mobile, etc.)
  • At every stage, log accordingly, using a request_id parameter included with the first message to trace the flow all the way through.

Approach

We’ll heavily leverage polymorphism.

We can structure the application logic by:

  • Defining a protocol for each step: message decoding, persistence, logging, notification.
  • Define as many versioned structs as we need along each step.
  • Implement each protocol for each struct.

If we apply these ideas, the whole flow can be represented as follows:

Protocols diagram

In practice:

  • as a message comes in, we proceed to identify it and package it as V1.Reports.Create
  • We implement the logging and persistence logic for that specific struct
  • After persistence, we return a V1.Reports.Created struct
  • Again, we implement logging a notification for that specific struct

This design allows us to:

  • split the problem into clear areas of responsibility
  • have explicit guarantees each step of the way: every struct has a predictable set of attributes
  • Unit/property test every step very easily
  • Extend the application by adding more structs, therefore increasing size but maintaining the same level of complexity

In terms of processes layout, here’s how it can look like:

Process diagram
  • We have one consumer to process and identify an incoming message;
  • The resulting struct is passed to a worker (checked out from a pool). Using a pool allows us not to outperform our database capacity. In this scenario, we have one worker per database connection;
  • After that, we pass the result struct to a notifier pool, which will publish the relevant messages to the other broker;
  • All along the way, we interact with the Logger module.

This setup mirrors the division of responsibilities we outlined in our protocols, to the point that it naturally leads to reusability across different applications.

Code implementation

As it’s upractical to show the entire application in a blog post, we’ll focus on some key areas.

General flow

It doesn’t really matter which queuing system we’re using, we just care about a few key properties in the incoming message:

  • explicit version
  • a request id that we can forward to the rest of the chain
  • a clear directive/topic on what the message is about

For instance, our message could look like this (using JSON as notation):

{
  "meta" : {
    "version" : 1,
      "topic" : "reports",
      "type"  : "create",
      "request_id" : "15456d4e-782b-11e5-8bcf-feff819cdc9f"
  },
  "data" : {
    "reference" : "ABC7193",
    "description" : "He's walking in space!",
    "submitted_at" : "2015-10-27T09:23:24Z"
  }
}

The format may change (e.g. the topic could be inferred by the queue name itself), but generally speaking this is a good baseline.

In our consumer logic, we want to aim for something along these lines:

message
  |> identify
  |> populate(message)
  |> log
  |> process
  |> log
  |> acknowledge(client)
  |> notify
  |> log

We can chain all operations and produce a very readable pipeline. We’ll come back to this structure later on when we talk about error handling.

Identifying the message

Thanks to the metadata included in the message, this is relatively easy.

Let’s first define a V1.Reports.Create struct.

defmodule V1.Reports.Create do
  defstruct request_id: nil, data: %{}
end

Version and topic are expressed by the struct name itself, so we don’t need to store them. We add a request_id attribute so that we can keep track of the request flow and a data attribute to store our domain specific data. Note that depending on how strict we want to be we can also opt for something like:

defmodule V1.Reports.Create do
  defstruct request_id: nil,
            reference: nil,
            description: nil,
            submitted_at: nil
end

In our case, we’re gonna stick with data as we need to be more flexible (our reports may have some additional fields and our database can handle documents with different shapes).

To identify the incoming payload, we can write a function like:

def identify(payload) do
  case payload.meta do
    %{"version" => 1, "topic" => "reports", "type" => "create"} -> V1.Reports.Create
    _other -> {:error, :unsupported_payload}
  end
end

We aggressively pattern match to quickly determine the struct to use; this approach scales really well even with a dozen different combinations. Beyond that, we can use metaprogramming to infer the pattern match clause from the list of payload structs available in our codebase (this is left as an exercise to the reader).

Populating the struct

To populate the struct, we can define a populate/2 method:

def populate(struct_module, payload) do
  %{"data" => payload["data"],
    "request_id" => payload["meta"]["request_id"]}
  |> Enum.into(Kernel.struct(struct_module, %{})
end

Using Enum.into/2 assumes that our struct implements the Collectable protocol. As explained in the official docs, we can think about this protocol as the counterpart of Enumerable: where Enumerable defines how to iterate over a certain data structure, Collectable expresses how an iterable data structure can be piped into another.

This means that we need to define the Collectable implementation for V1.Reports.Create:

defimpl Collectable, for: V1.Reports.Create do
  def into(original) do
    {original, fn
        s, {:cont, {k, v}} -> update_struct(s, k, v)
        s, :done -> s
        _, :halt -> :ok
    end}
  end

  defp update_struct(s, k, v) when is_string(k) do
    update_struct(s, String.to_existing_atom(k), v)
  end
  defp update_struct(s, k, v) do
    Map.put(s, k, v)
  end
end

Our implementation happily accepts a map with either string or atom keys, safely using String.to_existing_atom/1 to handle the conversion .

With this code in place, we can expect to have the following struct as a result:

%V1.Reports.Create{
  request_id: "15456d4e-782b-11e5-8bcf-feff819cdc9f",
  data: %{
    "reference" => "ABC7193",
    "description" => "He's walking in space!",
    "submitted_at" => "2015-10-27T09:23:24Z"
  }
}

Logging (part 1)

To log, we’re gonna leverage the standard Logger library provided by Elixir. The log/1 function we added to our pipeline can be as minimal as:

def log(item) do
  :ok = Logger.info item
  item
end

We return the item itself not to break the pipeline.

In order for this to work, we have to implement another protocol, String.Chars, which defines how a given type gets converted to a binary. Let’s do that for V1.Reports.Create:

defimpl String.Chars, for: V1.Reports.Create do
  def to_string(create_struct) do
    "type=create status=accepted request_id=#{create_struct.request_id} reference={create_struct.reference}"
  end
end

It’s important here to decide what matters in terms of tracing a payload through our system: while the request_id is a given, we can be flexible about other data depending on security/privacy concerns.

We’ll just log the reference, which will allow us to tie this specific logging event with subsequent persistence events. The use case can be: given that I’m looking at a persisted report and I know its reference, when was it created? What events lead to its persistence?

Processing the struct

The processing step is where most of our business logic resides. It’s intentionally left vague as its implementation may change dramatically depending on the intent expressed by the struct name. For these purposes, we’ll once again define a protocol to express this variability, Processable.

defprotocol Processable do
  def process(item)
end

To get this to work, we need to do three things:

  • import the protocol where the pipeline is defined: import Processable
  • define its implementation for V1.Reports.Create
  • define V1.Reports.Created, which we’ll return as expression of the successful processing

The V1.Reports.Created struct can look like the following

defmodule V1.Reports.Created do
  defstruct request_id: nil, record: nil
end

As we can see, it exposes the same request_id attribute we saw before and a record attribute, which will be populate with the struct coming from our database driver. We can now implement the Processable protocol:

defimpl Processable, for: V1.Reports.Create do
  def process(create_struct) do
    {:ok, new_record} = DB.Repo.create(create_struct.data)
    %V1.Reports.Created{request_id: create_struct.request_id, record: new_record}
  end
end

Note that DB.Repo.create is just an example, the database api will change depending on the persistence layer used.
One may wonder why we bother wrapping the persisted record in a struct instead of just returning the record itself. The reason is that along with the persisted data, we need to pass two extra pieces of information: the request_id, which effectively is metadata about the request and not part of the record itself, and the idea that this record has just been created. V1.Reports.Created expresses both with clarity to the rest of the system, particularly to the subsequent steps in the pipeline.

Logging (part 2)

Logging a report creation requires repeating the pattern we used before, i.e. implementing the String.Chars protocol for V1.Reports.Created.

defimpl String.Chars, for: V1.Reports.Created do
  def to_string(created_struct) do
    "type=create status=success request_id=#{created_struct.request_id} reference={created_struct.record.reference} id=#{created_struct.record.id}"
  end
end

We still key on the same type and indicate success as status, as we want to confirm that processing has been successful. For ease of search, we also log the same reference and the database id. This assumes that both are present in the record that has been persisted by the database. Note that as we define the default value for V1.Reports.Created.record as nil, this implementation assumes a fully populated struct.

Acknowledging the original message

At this point, we can safely acknowledge the message to the client that originally queued it. In RabbitMQ, for example, this means that the message can be safely removed from the queue.

Depending on the setup, this step may not be necessary.

Notifications

The last step in our pipeline is notifying another broker that processing has completed.

A notification message is defined by two attributes:
- a list of topics
- a payload

In code:

defmodule Message do
  defstruct request_id: nil, topics: [], data: %{}
end

Consequently, we can expose our pubsub layer via a PubSub.publish/1 function which will happily accept Message structs.

Lastly, we can create a Notification protocol that will define how to go from a given struct to a Message.

defimpl Notification, for: V1.Reports.Created do
  def process(created_struct) do
    %Message{
      request_id: created_struct.request_id,
      topics: ["v1.reports"],
      data: Map.from_struct(created_struct.record)
    }
  end
end

The list of topics can be extracted if needed.

Finally, let’s go back to the notify/1 function used in the pipeline:

def notify(item) do
  Notification.process(item)
  |> PubSub.publish
end

The last step, logging the notification, is identical for all Message structs:

defimpl String.Chars, for: V1.Reports.Created do
  def to_string(message_struct) do
    "type=publish request_id=#{message_struct.request_id} topics=#{format_topics(message_struct.topics)}"
  end

  defp format_topics(topics) do
    Enum.join(topics, ",")
  end
end

Another option is to log one line per topic, but for simplicity reasons we’ll skip that.

Process layout

So far we’ve treated this pipeline as a single-threaded flow, but as we outlined before this should not be the case.

A common scenario is to have:

  • a consumers pool, whose job is picking up a message from the queue and eventually acknowledging its successful handling;
  • a worker pool, hidden behind process/1: the processing operation can assume a valid, well-formed struct and the resulting code will be easier to maintain;
  • a notifiers pool, this time hidden behind notify/1. By using a pool we can control the pressure put on the external broker, especially because a single job can trigger many different notifications.

All of these techniques can leverage existing libraries in the BEAM ecosystem, so we won’t cover them here.

Error handling

To improve traceability of our pipeline, we want to be able to clearly log failures along the way, knowing exactly which step failed. It’s theoretically possible to infer this from a stack-trace, but it gets unwieldy pretty quickly.

Elixir will introduce a with operator to model a computation dependent on one or more preconditions (see the relevant issue here). For the time being, we can use some macros inspired by the ideas behind Railway oriented programming (see the Elixir specific blog post here and the theory behind it here).

The core of it is that every step of our pipeline will either return {:ok, result} or {:error, reason}, which will allows us either to proceed to the following step or shortcut out of the pipeline and return an error. In other languages, this idea is formalized at the core level as a monad (e.g. Haskell, where it’s defined as a Either type).

Let’s revise the main flow accordingly:

{:ok, message}
  >>> identify
  >>> populate(message)
  >>> log
  >>> process
  >>> log
  >>> acknowledge(client)
  >>> notify
  >>> log

Except for the final step, where we log the result and pattern match on either a success or an error tuple, we can replace every |> with >>> and tweak our implementations accordingly.

For example, let’s update identify/1:

def identify(payload) do
  case payload.meta do
    %{"version" => 1, "topic" => "reports", "type" => "create"} -> {:ok, V1.Reports.Create}
    _other -> {:error, :unsupported_payload}
  end
end

We wrapped the positive result in a tuple, a minimal change with a great benefit.

We won’t update all other methods in the article, as it’s mostly an exercise in copy in paste. As a last consideration, we need to decide if it’s worth capturing the end result of the flow:

def capture_result({:ok, _result}), do: :ok
def capture_result({:error, reason}) do
  # send to an exception app, queue an email, queue an error
end

The implementation here can change a lot, but generally it revolves around the question: “What errors do I want to know about?”. We could potentially ignore some errors and track only others, depending on their importance.

Conclusions

In this post we’ve run through a possible approach for the implementation of an event-driven data-processing pipeline in Elixir. The core ideas behind this implementation are:

  • define clear boundaries between each step by creating versioned structs
  • implement each step as a protocol, so that every versioned struct has a dedicated implementation
  • assign steps to different processes, so that they can be scaled independently depending on the needed capacity
  • abstract the steps (if needed) behind a clear and composable api, so that there’s only one place in the codebase where the entire pipeline is defined.
  • log and trace along the way, always keeping a request_id parameter at hand to connect all steps together

By following this approach, the benefits are:

  • steps isolation, which leads to ease of unit-testing: everything can expressed as “Given struct A, I want to return struct B”
  • great traceability
  • composability, so that adding a new step is a matter of repeating a pattern
  • useful abstraction: each step can be refactored or changed in complete isolation
  • ease of scaling, as all the steps run in independent processes
  • ability to grow in size but not in complexity: every time a new event is added, it’s sufficient to add the related protocol implementations