Back to the front page

Elixir Process Architecture or: How I Learned to Stop Worrying and Love to Crash

Let it Crash

“Let it crash.”

That’s what I kept hearing. To be honest, it's taken me a while to grok this. It wasn't until I designed and implemented a few systems with a focus on a Process based architecture that the philosophy really clicked. Now, it's core to my thinking and it's enabled me start writing systems with greater fault tolerant that are more well thought out, with less code.

Learning to let it crash

Let’s look at a practical example. Recently at work, I wrote an HTTP API that, given a person’s name and company, the system would find and respond with that person's email address. This system did so by reaching out to a few third party APIs (like ZoomInfo and Jigsaw) to see if they knew the email address of the user.

With multiple third party APIs in play, there is a lot that can go wrong. One of the services could have an outage, an API may change, the service may start timing out, and so many more things can go wrong. How can you possibly account for everything? I recently had a system that I was interacting with that started concatenating Java errors, formatted as XML, onto the end of a valid JSON response, about 25% of the time. Am I supposed to write code to handle that specific situation? I don't think so. Even if you did account for everything, my code would be 75% error handling, 25% actual feature code.

With this in mind, I chose to write this new system in Elixir. There were two specific reasons for this. First off, the “let it crash” approach I'll detail leads to a lot less code and generally, less code is easier to understand than more code. Secondly, since the requests to the externals APIs could be done independently, I might as well make it happen concurrently.

The Architecture

The basic architecture of this part of this system is a race. A request comes in for an email address, and we turn that request into a Person struct. From there we want to make multiple, concurrent API calls to the third party services. Whichever service responds with an email first wins and that email address is used. If another service find an address after this, that response will be ignored. If no email address is found by any of the services, the we'll just respond with a null email.

Basic Data Structures

Before we dive in too deep, I want to create a couple of structs to the system to give some structure around two main data types. Specifically, let's create a Person and a Name.

defmodule Person do  
  defstruct name: nil, email: nil, company: nil
end  
defmodule Name do  
  defstruct given_name: nil, family_name: nil
end  

When a request comes in, we hydrate a Name and a Person with the information we have. Namely the name and company. Our API will respond with a JSON representation of a Person. If we can find an email, we'll fill it in, otherwise, it's null. For example:

{
  "name": {
    "given_name":"John",
    "family_name":"Bohn"
  },
  "email":"john.bohn@alphasights.com",
  "company":"Alphasights"
}

Side note, I don't create structs for every type of data. Many times, a Map or Keyword is good enough. In this case though, these structs are pretty core to the whole system and they'll be passed around a bit, so I like to explicitly define them.

Processes

High level process interactions

Process interaction diagram

Conceptually, there are three processes involved. First, we want a process for each API (Jigsaw and ZoomInfo). These processes are responsible for calling the APIs and hydrating the email attribute of a Person struct if an email address is found.

Next, we need something to manage those discovery service processes. It will be responsible for spawning them, monitoring them, and responding with a Person with an email address if an email address was discovered. The monitoring here is extremely important since it is the core of our ability to "let it crash". More on that ahead.

Also, we should note that for each request out API, we'll spawn a new Discoverer (and by association new Jigsaw and ZoomInfo processes). When it's finished, we'll tear it all down. We don't need a pool of processes and the performance difference is negligible. We have hundreds of thousands of processes at our disposal (at least) so we might as well use them. Every app is different though so choose what works best for you.

Choosing process types

Now that we have a high level view of how our processes are going to interact, we need to pick the specific types of processes we want. In Elixir there are a few standard choices: GenServer, Task, and Agent. GenServer is a behavior module and is a core piece of Erlang OTP. Task and Agent however are new abstractions around processes specific to Elixir.

Elixir's documentation does a great job of explaining the various Process abstractions so I'll let them speak for themselves.

GenServer

A behavior module for implementing the server of a client-server relation.

A GenServer is a process as any other Elixir process and it can be used to keep state, execute code asynchronously and so on. The advantage of using a generic server process (GenServer) implemented using this module is that it will have a standard set of interface functions and include functionality for tracing and error reporting. It will also fit into a supervision tree.

Task

Conveniences for spawning and awaiting for tasks.

Tasks are processes meant to execute one particular action throughout their life-cycle, often with little or no communication with other processes. The most common use case for tasks is to compute a value asynchronously:

Agent

Agents are a simple abstraction around state.

Often in Elixir there is a need to share or store state that must be accessed from different processes or by the same process at different points in time.

The Agent module provides a basic server implementation that allows state to be retrieved and updated via a simple API.

In this app, we'll use both Task and GenServer. Discoverer will be a GenServer since we need to keep some state and would like a standard interface. Specifically, we need to keep a list of active discovery services tasks. Honestly, this could've been done with an Agent, but I just like the added standard interface that a GenServer provides. For the Jigsaw and ZoomInfo processes, we'll go with a Task. This could be written as a GenServer, but the processes will be short lived and don't have any state, which makes Task is a really good choice. We can get the asynchronicity we desire, while still having some structure and a protocol to follow. I like to think of Task as a process wrapper. It can take any arbitrary code, and make Process out of it.

Writing the First Service

Let's start with some code for dealing with Jigsaw's API so we have something for our Discoverer to manage. First, we create a module where we can drop in the basic settings for this service.

defmodule Jigsaw.Settings do  
  def api_base_url do
    "https://www.jigsaw.com/rest/searchContact.json?"
  end

  def api_token do
    Application.get_env(:jigsaw, :api_token)
  end

  def username do
    Application.get_env(:jigsaw, :username)
  end

  def password do
    Application.get_env(:jigsaw, :password)
  end
end  

Super straight forward, just pulling some protected information from the Mix configuration. We could've meta programmed something to remove some duplication, but it's so simple, it doesn't seem worth it. Next, we write the code that interacts with Jigsaw.

defmodule Jigsaw do  
  import Jigsaw.Settings

  def discover_email(person, discoverer) do
    email = Jigsaw.discover_email(person)

    send(discoverer, {:discovered, %{person | email: email}})
  end

  def discover_email(%Person{name: name, company: company}) do
    query = %{token: api_token,
              firstname: name.given_name,
              lastname: name.family_name,
              companyName: company}

    url = api_base_url <> URI.encode_query(query)

    HTTPotion.get(url, timeout: Application.get_env(:jigsaw, :timeout))
    |> process_response
  end

  defp process_response(%{status_code: 200, body: body}) do
    body
    |> to_string
    |> Poison.decode!
    |> extract_email
  end

  defp extract_email(%{"contacts" => [%{"email" => email} | _]}), do: email
end  

We start by importing the Jigsaw.Settings module so we have those configuration functions available to us. Next, we define the only publicly accessible function in the Jigsaw module, discover_email/1. We take the settings and a Person as arguments, then send a request an HTTP request to Jigsaw.

process_response/1 is where things start getting interesting. We're leveraging pattern matching to declaratively state that we only care about responses with a 200 status code. Everything else will throw a FunctionClauseError. This is immensely powerful. When combined with a good process oriented architecture, we form the core of what, to me, makes Elixir and Erlang's error handling strategy unique. By declaratively stating the happy path (or at least paths we know what want to handle explicitly) we're telling the world that this thing is going to crash and you need to be prepared for that.

Finally, we use the same technique in extract_email/1 to ensure that the response body has a key value pair for contacts; that the value for contacts is a List; that there is at least one element in that List; and that the first element has a key value pair for email. All that, and no if statement. Instead, we're just saying if any of those things aren't true, crash the process with a FunctionClauseError.

The Discoverer

Now let's build out the functions for handling all the collaboration in the Discoverer module. We'll build this one over a couple of code examples and start with the "client side" functions of the GenServer.

The "Client Side"

defmodule Discoverer do  
  use GenServer

  def start_link(discovery_services \\ [Jigsaw]) do
    GenServer.start_link(__MODULE__, [discovery_services: discovery_services, tasks: []])
  end

  def discover(pid, person) do
    discovery_timeout = Application.get_env(:email_discovery, :discovery_timeout)

    GenServer.call(pid, {:discover, person}, discover)
  end
end  

Pretty standard stuff. We create the start_link/1 function which starts a new GenServer with an empty List that we'll use to keep track of the discovery tasks we're going to spawn. Notice we're also allowing the modules we'll use for discovery to be injected and we provide a sane default. This leads to greater flexibility, and makes testing easier since I don't have to use any mocking. In the actual application, there are tests for all of this stuff, but I've left them out of this article for the sake of brevity.

We also create discover/2 which will be the public interface used to kick off the whole discovery process. Note that we're setting a timeout here from the configuration. We'll need the timeout for this GenServer call to be higher than that of the discovery tasks' HTTP requests so that if they do happen to timeout, they'll do so before the Discoverer GenServer calls.

The "Server Side"

Next, let's create the handler for the call with the {:discover, person} message and spawn the discovery processes.

  def handle_call({:discover, person}, from, state) do
    %{discovery_services: discovery_services} = state

    state = Map.merge(state, %{person: person, from: from, tasks: spawn_discovery(person, discovery_services)})

    {:noreply, state}
  end

  defp spawn_discovery(person, discovery_services) do
    _spawn_discovery(person, discovery_services, [])
  end

  defp _spawn_discovery(_person, [], tasks), do: tasks
  defp _spawn_discovery(person, [service | rest], tasks) do
    {:ok, pid} = Task.start(service, :discover_email, [person, self])
    task = %Task{pid: pid, ref: Process.monitor(pid)}

    _spawn_discovery(person, rest, tasks ++ [task])
  end

So, handle_call/3 with {:discover, person} calls spawn_discovery/2, which recurses over the discovery service modules that were passed in when we started the GenServer and spawns a Task for each one. That list is returned and is merged into the current state. From there, we call handle_discovery/2, which we will define in a minute with the Person and the current state.

If you're familiar with Task, you may be wondering why used Task.start/3 instead of Task.async/3 since it does the Process.monitor/1 call for us. The reason is because Task.async/3 creates a link to the parent process. In this case, the Discoverer that started it. That means, if the Task were to crash (which it is designed to do), it would also crash the Discoverer. We don't want that to happen because we're going to handle crashes differently. That said, we can just use Task.start/1, not create a link, and monitor the process ourselves.

Now that we've spun up the processes, we need to handle their responses. We'll do that in handle_info/2.

  def handle_info({:discovered, person}, state) do
    %{from: from} = state

    GenServer.reply(from, person)

    {:noreply, state}
  end

  def handle_info({:DOWN, ref, _, _, _}, state) do
    %{tasks: tasks} = state
    state = %{state | tasks: Enum.reject(tasks, &(&1.ref == ref))}

    handle_service_down(state)
  end

  defp handle_service_down(%{tasks: [], from: from, person: person, discovery_services: discovery_services}) do
    GenServer.reply(from, person)

    {:stop, :normal, %{discovery_services: discovery_services}}
  end

  defp handle_service_down(state), do: {:noreply, state}

There are two function clauses that match handle_info/2. One matches a {:discovered, person}, while the other catches crashed tasks. If we receive {:discovered, person}, the discovery service has found an email address and merged that into our Person. If we receive {:DOWN, ref, ...}, that means the discovery service task has crashed so we remove it from our tasks list and recurse with a call to handle_discovery/2. If no {:discovered, person} message is ever received, we'd continue this recursion until the tasks list is empty in which case we finally send a reply with the same person that was initially passed in (with a null email address value).

When it's all put together, our final module looks like the following.

defmodule Discoverer do  
  use GenServer

  def start(discovery_services \\ [Zoominfo, Jigsaw]) do
    GenServer.start(__MODULE__, %{discovery_services: discovery_services,
                                              tasks: []})
  end

  def discover(pid, person) do
    discovery_timeout = Application.get_env(:email_discovery, :discovery_timeout)

    GenServer.call(pid, {:discover, person}, discovery_timeout)
  end

  def discovery_services(pid) do
    GenServer.call(pid, :discovery_services)
  end

  def handle_call(:discovery_services, _, state) do
    {:reply, state[:discovery_services], state}
  end

  def handle_call({:discover, person}, from, state) do
    %{discovery_services: discovery_services} = state

    state = Map.merge(state, %{person: person, from: from, tasks: spawn_discovery(person, discovery_services)})

    {:noreply, state}
  end

  def handle_info({:discovered, person}, state) do
    %{from: from} = state

    GenServer.reply(from, person)

    {:noreply, state}
  end

  def handle_info({:DOWN, ref, _, _, _}, state) do
    %{tasks: tasks} = state
    state = %{state | tasks: Enum.reject(tasks, &(&1.ref == ref))}

    handle_service_down(state)
  end

  defp handle_service_down(%{tasks: [], from: from, person: person, discovery_services: discovery_services}) do
    GenServer.reply(from, person)

    {:stop, :normal, %{discovery_services: discovery_services}}
  end

  defp handle_service_down(state), do: {:noreply, state}

  defp spawn_discovery(person, discovery_services) do
    _spawn_discovery(person, discovery_services, [])
  end

  defp _spawn_discovery(_person, [], tasks), do: tasks
  defp _spawn_discovery(person, [service | rest], tasks) do
    {:ok, pid} = Task.start(service, :discover_email, [person, self])
    task = %Task{pid: pid, ref: Process.monitor(pid)}

    _spawn_discovery(person, rest, tasks ++ [task])
  end
end  

Great! Now all that's left for us to do is create the ZoomInfo module and add it as a default discovery service in Discoverer.start/2.

defmodule ZoomInfo do  
  import ZoomInfo.Settings

  def discover_email(person, discoverer) do
    %{"Email" => email} = Zoominfo.discover_email(person)

    send(discoverer, {:discovered, %{person | email: email}})
  end

  def discover_email(%Person{name: name, company: company}) do
    query = %{api_token: api_token,
              firstName: name.given_name,
              lastName: name.family_name,
              companyName: company}

    url = api_base_url <> URI.encode_query(query)

    HTTPotion.get(url, timeout: Application.get_env(:zoominfo, :timeout))
    |> process_response
  end

  defp process_response(%{status_code: 200, body: body}) do
    body
    |> to_string
    |> Poison.decode!
    |> extract_email
  end

  defp extract_email(response) do
    # some parsing semantics
  end
end  

You can see that it starts off a lot like Jigsaw. The only real difference in the request phase is the key names in the query. Where they diverge further is how the email is extracted. In the Zoominfo API, there is a PersonRecord key who's value can be single record or a list of records. To handle both, I'm again using recursive pattern matching to declare what to do in either case.

Now, we just change Discovery.start/2 to also use Zoominfo as a default discovery module, and we're done!

def start(discovery_services \\ [Zoominfo, Jigsaw]) do  
  GenServer.start(__MODULE__, %{discovery_services: discovery_services, tasks: [])
end

Take aways

  • Use pattern matching to declare your happy (or handled) paths.
  • Think about potential errors up front (things that wouldn't match).
  • You're not going to be able to think of everything so handle errors in a generic way (by crashing).
  • Gracefully handle process crashes via monitoring.

Comments

comments powered by Disqus