CivilCode Labs

CivilCode Labs is the software development blog for CivilCode.

CivilCode is a team of business software developers with expertise in web applications, process automation, and complex workflows. We develop custom applications that solve complex business process problems.

Our primary development platform is Elixir, and we’re active in the local Montréal-based and worldwide Elixir communities. Elixir is a dynamic, functional language designed for building scalable and maintainable applications.

Learn more about our advocacy of Elixir.

Logo

Home
Blog
About

3 May 2018

Streaming remote files with Elixir

by Nicolas Charlery

Context: We want to provide a file to a user from a remote S3 bucket.

Here are some options we have:

With the first option, what if the file to serve is big and/or multiple users are trying to download the file simultaneously? The demand created by the users could easily saturate the memory resources.

Storing a file on the file system brings the following issues:

Ultimately, we should adopt a defensive strategy and consider services external to the application boundaries as unreliable (e.g., databases, file systems, public APIs, …). In this blog post, we will explore the latter option: streaming the file.

Requirements

In order to act as a middle man and stream the remote resource, the remote server must implement the Chunked transfer encoding mechanism described in HTTP/1.1. Note: HTTP/2 provides a different mechanism to stream data.

First draft

By following the documentation and various examples online, we ended up with this naive implementation:

def show(conn, %{"id" => _batch_id}) do
  remote_file_url = ":file_url:"
  content_type = ":content_type:"

  %HTTPoison.AsyncResponse{id: ref} = HTTPoison.get!(remote_file_url, %{}, stream_to: self())

  updated_conn =
    conn
    |> put_resp_content_type(content_type, "utf-8")
    |> send_chunked(200)

  loop_receive(ref, updated_conn)
end

def loop_receive(ref, conn) do
  receive do
    %HTTPoison.AsyncChunk{chunk: chunk, id: ^ref} ->
      {:ok, updated_conn} = chunk(conn, chunk)
      loop_receive(ref, updated_conn)

    %HTTPoison.AsyncEnd{id: ^ref} -> conn
    %HTTPoison.AsyncHeaders{id: ^ref} -> loop_receive(ref, conn)
  end
end

We specify to HTTPoison where the data should be streamed to, in this case the current process (self). We then specify that the response will be chunked with send_chunked(conn, 200). At this point, nothing is sent yet to the browser. The loop_receive function will then consume the messages sent to the mailbox of the current process, and we send the response to the connection with chunk(conn, chunk), chunk by chunk.

Thanks to the Erlang observer, we noticed that our process mailbox would get full with messages quite fast, even though the files were chunked. That is caused by the stream_to: self(). HTTPoison consumes the remote resource as fast as it can, but still provides an async response.

In order to solve this issue, we will need to use the async option, and since we want the browser/consommer to dictate how fast it consumes the data, we want have more control and block after getting a chunk, with async: once.

def show(conn, %{"id" => _batch_id}) do
  remote_file_url = ":file_url:"
  content_type = ":content_type:"

  %HTTPoison.AsyncResponse{id: ref} = HTTPoison.get!(remote_file_url, %{}, async: once, stream_to: self())

  updated_conn =
    conn
    |> put_resp_content_type(content_type, "utf-8")
    |> send_chunked(200)

    loop_receive(ref, updated_conn)
  end

  def loop_receive(ref, conn) do
    %HTTPoison.AsyncResponse{id: ref} = resp

    receive do
      %HTTPoison.AsyncStatus{id: ^ref} ->
        continue(resp)
      %HTTPoison.AsyncHeaders{id: ^ref} ->
        continue(resp)
      %HTTPoison.AsyncChunk{chunk: chunk, id: ^ref} ->
        chunk(conn, stream_next(resp))
      %HTTPoison.AsyncEnd{id: ^ref} ->
        {:halt, resp}
    end
  end

  defp continue(resp) do
    resp
    |> stream_next
    |> read_stream
  end

  defp stream_next(resp) do
    {:ok, ^resp} = HTTPoison.stream_next(resp)
    resp
  end
end

Here we will first set a HTTP request with async: :once, meaning that HTTPoison will only stream one chunk at a time. To get the next chunks, we need to call stream_next on the connection, until we get the whole file content.

Out of this code, we get the following benefits

This allows us to deal with large files efficiently, while keeping the process mailbox lean. Plus, nothing transits by the file system.

Refactor using Elixir Streams

However working, this present code is tightly coupled to the controller and notably to the conn Plug connection. It is not reusable neither. It would be nice to have a reusable RemoteFileStreamer module extracted. The streaming could then be used anywhere (eg: controller, console), and its usage would as simple as:

url
|> RemoteFileStreamer.stream
|> Enum.each(fn(chunk) -> IO.puts chunk end)

In order to extract a module solely responsible for file streaming, Elixir streams come in handy. A stream allows us to perform an action lazily. Instead of operating on a giant set of data in memory, we could perform actions on each set lazily, as it comes. In our case though, we just want to pass the chunk to another entity, as if we were passing a bucket full of water to someone in order to extinguish a fire hazard.

A Stream needs three functions to operate:

defmodule RemoteFileStreamer do
  @spec stream(String.t) :: Enumerable.t
    def stream(url) do
    Stream.resource(fn -> build_stream(url) end, fn resp -> read_stream(resp) end, fn _resp ->
      :ok
    end)
  end

  defp build_stream(remote_file_url) do
    HTTPoison.get!(remote_file_url, [], stream_to: self(), async: :once)
  end

  defp read_stream(resp) do
    %HTTPoison.AsyncResponse{id: ref} = resp

    receive do
      %HTTPoison.AsyncStatus{id: ^ref} ->
        continue(resp)

      %HTTPoison.AsyncHeaders{id: ^ref} ->
        continue(resp)

      %HTTPoison.AsyncChunk{chunk: chunk, id: ^ref} ->
        _ = stream_next(resp)
        {[chunk], resp}

      %HTTPoison.AsyncEnd{id: ^ref} ->
        {:halt, resp}
    end
  end

  defp continue(resp) do
    resp
    |> stream_next
    |> read_stream
  end

  defp stream_next(resp) do
    {:ok, ^resp} = HTTPoison.stream_next(resp)
    resp
  end
end

The controller now contains the file streamer:

  def show(conn, _params) do
    file_infos = %{filename: ":filename:", url: ":url:"}
    chunked_conn = prepare_chunked_conn(conn, file_infos.name)

    file_infos.url
    |> RemoteFileStreamer.stream
    |> Enum.reduce(chunked_conn, fn(chunk, conn) ->
      {:ok, new_conn} = chunk(conn, chunk)
      new_conn
    end)
  end

  defp prepare_chunked_conn(conn, filename) do
    content_type = MIME.from_path(filename)

    conn
    |> put_resp_content_type(content_type, "utf-8")
    |> put_resp_header("content-disposition", ~s(attachment; filename="#{filename}"))
    |> send_chunked(200)
  end

Testing the dots

To test this module as a black box, we need to simulate an HTTP server serving a document. Cowboy’s repository provides an Erlang example for how to serve chunked responses. Have a look at the cowboy server for more details.

Conclusion

You’ve discovered from this blog post how to deal with file streaming with Phoenix and HTTPoison, plus a refactoring using Elixir Streams.

We have published a micro-library on Hex, so you don’t have to reinvent the wheel. The code is also available on Github.

Happy coding!

Thanks to Hugo Frappier and Nicholas Henry for the help and multiple reviews

tags: elixir