A Cauldron of Black and White Stones

Peter Minten's blog

Elixir's Enumerable

Sometimes Elixir can be quite a pain. Often it’s because the libraries don’t yet have what you need or because a feature that looks like it’s going to do exactly what you want turns out to be more limited than you would like (|> I’m looking at you). Sometimes however the pain is of a different kind, the kind you get from trying to understand why that seemingly logical Go move is the exact opposite of what’s good, the kind you get from reading about category theory, the kind you get from trying to use the newest kind of lenses in Haskell. In other words the kind of pain that comes from desperately trying to understand something that seems to make no sense at all.

And then, after beating your head against the proverbial desk enough time, you suddenly realize that things are starting to make sense and that it’s actually all quite clear.

Today I had one of those moments. After trying to produce a better stream design based on functions that return a value and a replacement for themselves (think Continuation Passing Style) I ran into a problem where my Iter.to_list was much slower than Enum.to_list. When I asked on IRC @ericmj told me that it’s just not possible to make my iterator approach work fast enough and that that’s the reason Elixir switched to a reduce based approach. He also gave me the excellent tip to look at Clojure’s reducers, which was the inspiration for Elixir’s design.

While the blog posts linked from Clojure’s documentation are informative they are (naturally) about Clojure and not everybody understands that. So here’s an attempt to explain things in an Elixir context.

This post consists of four main parts.

Enumerable, Enum and Stream  

As the user of Elixir code you will often use Enum, sometimes use Stream and rarely implement Enumerable. The Enum module is where most of Elixir’s collection manipulation code resides. It has your basic map/2, filter/2 and reduce/3 functions as well as stuff like at/2 / fetch/2 (look up an element given an index), drop/2 and min/1. Whenever you want to do something with a collection look in Enum first. Often the tools you need are there.

Sometimes the methods from Enum are somewhat inefficient. Take for example tripling some numbers and filtering out multiples of 7 afterwards (Enum.filter_map doesn’t help here, it filters before mapping). You could write Enum.map(l, &(&1 * 3)) |> Enum.filter(&(rem(&1, 7) != 0)) but there’s an inefficiency. It comes from Enum.map constructing a list which is immediately “eaten” by Enum.filter. This work is unnecessary if Enum.map could somehow pass elements to Enum.filter one by one.

This is where streams come in. By simply writing Stream.map(l, &(&1 * 3)) |> Enum.filter(&(rem(&1, 7) != 0)) the inefficiency is removed. Instead of returning a list Stream.map/2 returns a stream for which there is an Enumerable implementation so it can be used with Enum. Now what Stream.map does is it creates a stream that wraps an enumerable and whenever an element is requested it fetches the element from the enumerable but before passing it along transforms it using the function given to Stream.map.

Because all the Stream functions return streams sooner or later you’ll need to use Enum functions to get a list or some other value (such as a sum) out of the stream.

It’s actually pretty easy to make your own data structures work with Enum and Stream. For that you need to define your data structure as a record and add an Enumerable implementation for it.

Reduce and Enum  

The reduce function, also known as (left) fold in other languages, is the most direct translation of loops like this:

a = 1
for i in 1..10 do
  a = a * i
end
# a is now 3628800

In Elixir we would write such a loop using Enum.reduce/3 as:

Enum.reduce(1..10, 1, fn (i, acc) ->
    acc * i
end)
# returns 3628800

Simple enough, isn’t it. There’s nothing complicated about reduce, well not until you look at the implementation of streams.

Reducing works by walking down a data structure (typically a list) and to call a function for each element. The function gets passed the element from the list and a state value (called an accumulator) and is expected to return a new state value, which gets passed to the function the next time it’s called. The initial state value is what you pass to reduce as the second argument. From now on I will refer to state values as accumulators.

All functions in Enum are conceptually based on this simple function (more on that later). For example map/2 can be very efficiently expressed using a reduce. Here’s the typical code for map:

def map([], _), do: []
def map([h|t], f), do: [f.(h)|map(t,f)]

Err, wait a minute. That can’t be right. This function isn’t tail recursive and will thus eat much more stack memory than it needs to. This is more realistic:

def map(l, f), do: do_map(l, f, [])

defp do_map([], f, acc), do: :lists.reverse(acc)
defp do_map([h|t], f, acc), do: do_map(t, f, [f.(h)|acc])

I’ve used :lists.reverse to avoid depending on Enum as the point of this example is how you could do things without Enum. It’s also slightly faster (no overhead from calling a protocol function) and the fact that it’s restricted to lists doesn’t matter as we’re working on lists anyway.

Now here’s how you would write map/2 using Enumerable, so it works for every enumerable:

def map(e, f) do 
  Enumerable.reduce(e, [], fn (x, acc) -> [f.(x)|acc] end) |> :lists.reverse()
end

While superficially different this is actually a lot like the accumulator version of map with a few differences. Firstly the recursive call to do_map is gone, instead just the new accumulator is returned. Secondly there’s no more manual pattern matching to get the first element of the list. Instead the Enumerable.reduce function passes our function an element, one at a time.

Read that last sentence again, it’s key to understanding the power of reduce. The reduce function is responsible for calling the function we supply with each element. It is responsible for the how of iteration. Our function doesn’t need to know anything about the data structure we’re iterating over as long as there’s a function that can call our function. Don’t call us, we’ll call you.

Another important function in Enum is filter/2. Again the translation is quite obvious:

def filter(l, f), do: do_filter(l, f, [])

defp do_filter([], f, acc), do: :lists.reverse(acc)
defp do_filter([h|t], f, acc) do 
  if f.(h) do
    do_filter(t, f, [h|acc])
  else
    do_filter(t, f, [acc])
  end
end
def filter(e, f) do
  Enumerable.reduce(e, [], fn (x, acc) ->
    if h.(x) do
      [f.(x)|acc]
    else
      acc
    end
  end) |> :lists.reverse()
end

Just the same transformation as for map.

A function like count/1 can be implemented as:

def count(e) do
  Enumerable.reduce(e, 0, fn (_, acc) -> acc + 1 end)
end

In reality it’s a call to Enumerable.count/1, one of the two extra (non-reduce) functions on Enumerable (the other is member?/2) which are probably there to support data structures that have a faster way of computing them than using reduce (like sets). We’ll talk a look at creating an Enumerable implementation for a custom data structure in the next section.

As you have seen all the Enum functions can be easily and efficiently implemented using reduce. reduce is quite general. It’s also not all that hard to implement, as we shall see.

Writing a reduce function  

We’ve seen how you can use reduce/3 to do quite a lot of things. But how do you create a reduce function? Let’s look at a few examples. First, a data structure (a binary tree).

defmodule BinTree do
  defrecord Tree, value: nil, left: nil, right: nil

  def reduce(nil, acc, _), do: acc
  def reduce(Tree[value: value, left: left, right: right], acc, fun) do
    acc1 = fun.(value, acc)
    acc2 = reduce(left, acc1, fun)
    reduce(right, acc2, fun)
  end
end

To reduce a binary tree we’ll first give the value to the passed reducer function and then call reduce on the left and right trees. Now that we have reduce it’s easy to define an Enumerable implementation:

defimpl Enumerable, for: BinTree.Tree do
  def reduce(BinTree.Tree[] = t, acc, fun), do: BinTree.reduce(t, acc, fun)
  def count(BinTree.Tree[] = t) do
    BinTree.reduce(t, 0, fn (_, acc) -> acc + 1 end)
  end
  def member?(BinTree.Tree[] = t, x) do
    BinTree.reduce(t, false, fn (v, acc) -> acc or x == v end)
  end
end

The count/1 and member?/2 functions are required as part of the Enumerable protocol, but you can always define them like I did. They’re mostly useful for when you have a faster way to compute them. For example if the tree has a guarantee that left.value < value < right.value you could use that to avoid examining the whole tree.

Let’s test it:

t = BinTree.Tree[value: 1, 
                 left: BinTree.Tree[value: 2],
                 right: BinTree.Tree[value: 3]]
Enum.to_list(t)    # returns [1, 2, 3]
Enum.count(t)      # returns 3
Enum.member?(t, 1) # returns true
Enum.member?(t, 4) # returns false

Ok, time for a more complicated scenario, which I’ll also revisit in the section about streams to demonstrate how enumerable can be used to create very effective pipelines. Let’s say you have a very big file and want to read it in blocks of 512 bytes. There’s stuff for this in the standard library (File.stream!/2, IO.binstream/2 and more). But for the sake of learning let’s pretend all that isn’t there.

defmodule BlockReader do
  defexception ReadError, message: nil

  def read!(device, block_size // 512) do
    fn acc, fun -> do_read!(device, block_size, acc, fun) end
  end

  defp do_read!(device, block_size, acc, fun) do
    case IO.read(device, block_size) do
      :eof                      -> acc
      {:error, reason}          -> raise ReadError, message: reason
      data when is_binary(data) -> do_read!(device, block_size, fun.(data, acc), fun)
    end
  end
end

Given an open file (i.e. an IO device) this will cut up a file in blocks. It can be used like this:

File.open!("/tmp/some_file") |> BlockReader.read!() |> Enum.to_list()

Quick question to see whether you’ve been paying attention, did you notice that BlockReader.read! works a bit differently than BinTree.reduce?

While BinTree.reduce calls the supplied function directly BlockReader.read! returns a function that takes an accumulator and a reducer function. So how does this work? Well Enumerable is implemented for functions. Any function that takes two arguments, initial accumulator and function is a valid Enumerable. The reduce implementation for functions very simple:

def reduce(function, acc, f), do: function.(acc, f)

In other words when a function gets passed to Enum.reduce all that happens is that the function gets called with the other two arguments of Enum.reduce (namely the accumulator and reducer function).

Besides manually writing reduce functions there are also a few functions in Stream that generate them such as Stream.iterate/2 and Stream.repeatedly/1.

In the next section we’ll see how streams allow us to put something between a reduction function and the reducer.

Streaming transformations  

It’s easy enough to write some code that computes the line count and character count (grapheme count to be precise) of each line of a file:

File.stream!("my_file") |> Enum.reduce({0, 0}, fn (line, {lines, chars}) -> 
  {lines+1, chars + String.length(line)}
end)

But what if we want something a little more complicated, like, say, ignoring lines that start with ‘#’? We could of course complicate the reducer function but there’s an alterative.

Streams allow us to stick something (like a map or a filter) in the middle of a reduction pipeline.

File.stream!("my_file")
  |> Stream.reject(&(String.startswith(&1, "#")))
  |> Enum.reduce({0, 0}, fn (line, {lines, chars}) -> 
       {lines+1, chars + String.length(line)}
     end)

Here Stream.reject/2 (reject is filter with the condition inverted) very efficiently filters out the lines we don’t want, without constructing a big intermediate list of course.

Streams are easy to use, you just stick them between a generator (like File.stream!) and some call to an Enum function.

Most, if not all, of the time the functions in Stream are all you need to work with streams. But it’s good to know how they work under the hood. Well, it’s a little bit complicated I’m afraid.

Let’s start by looking at the Stream.Lazy record:

defrecord Stream.Lazy, enumerable: nil, fun: nil, acc: nil

Three fields: enumerable, fun and acc. The enumerable field is due to a pipeline File.stream!("a") |> Stream.map(&some_fun/1) |> Enum.each(&IO.puts/1) being translated (by the |> macro) to Enum.each(Stream.map(File.stream!("a"), &some_fun/1), &IO.puts/1), meaning that Stream.map wraps the result of File.stream! (which is a reducer calling function, which is an enumerable, like in our implementation). The enumerable field stores the enumerable passed to Stream.map.

The acc field stores the accumulator for your stream. It can be nil, if you don’t have an accumulator. The fun field stores your stream function.

The stream function should always accept a reducer function (I’ll call this the “inner” reducer). If you’re using an accumulator (the acc field is specified and not nil) it should also accept something commonly called nesting that you only need if you want to stop streaming before the input is done (more on that later). So a non-accumulator-using stream function should accept an inner reducer and an accumulator-using stream function should accept an inner reducer and _nesting. If you consider this to be confusing, I fully agree.

The stream function should return another function. If you’re using an accumulator the function should accept an entry (from the “outer” generator/stream, so the input you’re working on) and an accumulator for the inner reducer function and return a new accumulator for it. For example here’s how you could implement a stream that adds 1 to each input before passing it along.

def add_one(e) do
  Stream.Lazy[enumerable: e,
              # acc is nil by default
              fun: fn(inner_fun) ->
                fn(entry, inner_acc) -> inner_fun.(entry + 1, inner_acc) end
              end]
end

If the accumulator is not nil you get a tuple { inner_acc, my_acc } instead of just inner_acc and should return a similar tuple. Take for example a function that adds successive numbers to each value.

def add_increasing(e) do
  Stream.Lazy[enumerable: e,
              acc: 1,
              fun: fn(inner_fun, _nesting) ->
                fn(entry, { inner_acc, my_acc }) -> 
                  { inner_fun.(entry + my_acc, inner_acc), my_acc + 1 }
                end
              end]
end

That’s most of the magic of stream functions. One tiny detail remains. Remember that weird nesting argument? It’s used when you want to early abort a stream (before the input is done). This is used in Stream.take and it depends on throwing a tuple. Please don’t write code that uses this, consider the internal throw an implementation detail that’s not supposed to leak. I’m only showing it to explain how Stream works internally.

def until_second_boom(e) do
  Stream.Lazy[enumerable: e,
              acc: 1,
              fun: fn(inner_fun, nesting) ->
                fn
                  (entry, { inner_acc, 0 }) when entry == "boom" ->
                    throw { :stream_lazy, nesting, inner_acc }
                  (entry, { inner_acc, my_acc }) ->
                    { inner_fun.(entry, inner_acc),
                      (if entry == "boom", do: my_acc - 1, else: my_acc) }
                end
              end]
end

When you run this code with MyMod.until_second_boom([1,2,"boom",3,"boom"]) |> Enum.to_list the result is [1,2,"boom",3].

That’s it for Stream. Please note however that you’re not supposed to write your own streams. If you need something that’s missing from Stream please write a patch for it. That way you won’t run the risk of something breaking if the implementation of Stream is updated.

Wrapping it up

We’ve seen how Enumerable lets you work with all kinds of data structures and custom stream sources (File.stream!) through a consistent interface. With Enum and Stream pipelines of transformations can be both expressive and efficient. Yet at the same time there’s great flexibility, you can define your own stream sources and your own stream “consumers”. You can even (but are not advised to) define your own stream functions.