A Cauldron of Black and White Stones

Peter Minten's blog

Parallelism in Elixir

Parallelization is something that comes up more and more now that processors get more and more cores. In this blog post I will take a look at how Elixir supports parallelism.

First the usual lecture on parallelism and concurrency. These are often mixed up, probably because traditionally to get parallelism you needed to work with the tools of concurrency (threads and mutexes and all that jazz).

Parallelism is fundamentally about taking something you can do in a sequential (one process/thread) program and making it faster by doing parts of it at the same time. Concurrency on the other hand is about making things possible that are impossible in a sequential program.

For example when you have a function that blocks until it receives input from the keyboard and a function that blocks until it receives input from the network and you want to watch for both you’re going to need multiple threads/processes and thus concurrency (assuming there’s no select(3) or something available).

On the other hand it’s easy to count the words in a giant file sequentially, in just a single process/thread. But if you can split up the file and have multiple workers each count part of the file and those workers can run all at the same time (in parallel) you can get your result much faster.

When working with parallelism you ideally want to convert a sequential algorithm into a parallel one without changing the semantics. Now the bad news, in Elixir this will not work. In Haskell it will. Haskell has deterministic parallelism, Elixir doesn’t. Well, technically Haskell doesn’t have deterministic parallelism (that’s simply impossible to do with how computers work) but it’s so good at hiding the non-determinism that for all intents and purposes it has deterministic parallelism.

In Elixir we don’t quite have it so easy as Haskell programmers. We have to make do with the tools that Erlang gives us. Ok, that’s perhaps not the worst fate. Erlang supports concurrency quite well (understatement of the century) and with concurrency we can build parallelism.

A simple parallel word count

Let’s think about parallel word count. How can we implement it using the tools of Elixir? First, consider how the algorithm works:

  1. Divide the input lines into equally sized groups.
  2. For each of the groups, in parallel, count the number of words.
  3. Sum the results.

This is actually quite a well known pattern: divide-map-reduce. Any time you hear somebody talk about MapReduce this is the fundamental idea behind it: divide the work, map it to workers and reduce the results. There’s more to it than that of course, but fundamentally it’s this simple.

Now, how can we map (pardon the pun) these steps to Elixir code? Dividing is fairly easy, because addition is commutative (a + b == b + a) we can simply split the lines into N lists like this:

1
2
3
4
5
def divide(lines, n), do: do_divide(lines, List.duplicate([], n), [])

defp do_divide([], o1, o2), do: o1 ++ o2
defp do_divide([l|ls], [o|o1], o2), do: do_divide(ls, o1, [[l|o]|o2])
defp do_divide(ls, [], o2), do: do_divide(ls, o2, [])

Running divide([1,2,3,4,5,6,7,8,9,10], 3) gives [[8, 5, 2], [7, 6, 1], [10, 9, 4, 3]], so the lines are neatly divided. Ok, order is weird but we’ve already seen that that doesn’t matter (addition is commutative). Compared to the obvious solution of slicing the list into equal pieces we don’t need the length, which means this algorithm will be easier to adapt to a form that works with any enumerable. Mostly though it’s because I learned functional programming in Haskell and this algorithm works well with lazyness. :)

For each of these groups of lines we’ll want to spawn a worker.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def parcount_spawn(groups) do
  master = self()
  Enum.each(groups, fn group ->
    Process.spawn(fn ->
      # Gotta love closures: group is captured
      # (If only I could do that so easily for the white Go groups)
      count = Enum.map(group, &length(split_words(&1)))
              |> Enum.reduce(0, &(&1+&2))
      master <- count
    end)
  end)
end

defp split_words(""), do: [] # String.split gives [""] in this case
defp split_words(s), do: String.split(s)

Each of the workers counts the words in the lines in its group, sums them and sends the count to the master process (the one that spawned the workers).

All that remains is to collect the subtotals and sum them.

1
2
3
4
5
6
7
def parcount_collect(n) do
  Enum.reduce(1..n, 0, fn _, total ->
    receive do
      count -> total + count
    end
  end)
end

Here the reduce of a range is a bit of a trick in order to go into receive a specific number of times.

To put it all together:

1
2
3
4
5
def parcount(lines, n) do
  groups = divide(lines, n)
  parcount_spawn(groups)
  parcount_collect(n)
end

When we pass parcount(["A", "B C", " ", "D D D ", ""], 3) it neatly gives us 6.