Previous articles about Concurrency in Elixir:
Let’s put in practice what we’ve seen in the last few articles about concurrency. In this article we see how to fully make our initial cryptocurrency example, using just HTTPoison module and spawn
, send and receive
to handle concurrency. At the end we will refactor it using Task, which makes everything far easier!
Our goal is to download concurrently different prices from Coinbase and return a map with the latest prices.
get_price
function
Let’s see step by step how to get the current price of a single product.
iex> product_id = "BTC-USD"
iex> url = "https://api.pro.coinbase.com/products/#{product_id}/ticker"
iex> %HTTPoison.Response{body: body} = HTTPoison.get!(url)
%HTTPoison.Response{
body: "{\"trade_id\":60538353, ..."
headers: ...
...
}
iex> ticker = Jason.decode!(body)
%{
"price" => "3891.76000000",
...
}
iex> price = String.to_float(ticker["price"])
3891.76
Ok, it works and we get the price as a float, but I wouldn’t put this code in a function straightaway: it’s coupled and needs a bit of refactoring.
Decoupling
Let’s start with something simple, defining a function ticker_url(product_id)
to return the URL string for the given product.
defmodule Coinbase.Client do
@coinbase_base_url "https://api.pro.coinbase.com"
def ticker_url(product_id),
do: "#{@coinbase_base_url}/products/#{product_id}/ticker"
end
Easy.
Now, by using libraries like HTTPoison and Jason directly in our get_price
function, we are coupling the get_price
implementation with its external dependencies. If in the future we want to change our HTTP client to something like Tesla or Mint, we’ll need to change the implementation of all depending functions all over our code.
Instead of doing so, we can create a new module called Coinbase.HTTPClient
, which we use to wrap HTTPoison and Jason.
defmodule Coinbase.HTTPClient do
def get_json!(url) do
HTTPoison.get!(url)
|> Map.get(:body)
|> Jason.decode!()
end
end
For brevity we are not handling any HTTPoison or Jason error. get_json!(url)
makes a HTTP GET request and deserialises the JSON response, returning it in the form of a Map.
We now define our get_price
function using only our HTTPClient module instead of HTTPoison and Jason.
defmodule Coinbase.Client do
alias Coinbase.HTTPClient
def get_price(product_id) do
product_id
|> ticker_url()
|> HTTPClient.get_json!()
|> Map.get("price")
|> String.to_float()
end
end
We see how this code is easier to read, thanks to the Elixir pipes, and also easier to change, since if we want to Jason to Poison or HTTPoison to Tesla, we just need to change our HTTPClient.get_json!/1
implementation.
This kind of refactoring is also shown in the José Valim’s article, Mocks and explicit contracts, in which we see how decoupling makes easier to test our code. If you haven’t read the article, I think it’s worth it.
In sequence
We define a new function to get multiple prices.
defmodule Coinbase.Client do
def get_prices(products) do
products
|> Enum.map(&get_price/1)
end
end
get_prices/1
enumerates the products list and sequentially requests the price for each one, returning a list of prices.
As we saw in a previous article where we started talking about concurrency, getting prices one at a time, is not very efficient. Our computer is idle most of the time waiting for the response from the server.
Let’s see how much time it takes to sequentially get prices for seven different products, so we can compare it later with the concurrent one.
To benchmark the time we define Coinbase.measure_time(func)
, which runs the passed function, calculating the elapsed time.
defmodule Coinbase do
def cyan_text(text) do
IO.ANSI.cyan() <> text <> IO.ANSI.reset()
end
def measure_time(func) do
time_start = System.monotonic_time(:millisecond)
result = func.()
time_end = System.monotonic_time(:millisecond)
seconds = (time_end - time_start)/1000
cyan_text("time #{seconds}s") |> IO.puts()
result
end
end
After reading Cool CLIs in Elixir (Part 2) with IO.ANSI, I decided to add some color to the benchmarking output. The time will then be printed in cyan, using the cyan_text
function.
Great, let’s get the prices of these seven products, sequentially, and see how much time we need.
iex> products = ["BTC-USD","ETH-USD","LTC-USD",
...> "BCH-USD","XRP-USD","XLM-USD",
...> "ZRX-USD"]
iex> import Coinbase, only: [measure_time: 1]
iex> measure_time fn ->
...> Coinbase.Client.get_prices(products)
...> end
time 2.132s
[3895.98, 136.01, 58.04, 141.6, 0.3131, 0.105945, 0.271112]
The function returns a list of prices: the first price correspond to the first product in the list “BTC-USD”, the second to “ETH-USD” etc. To make it easier to understand it’s better to return a Map like this
%{
"BTC-USD" => 3895.98,
"ETH-USD" => 136.01,
...
}
where the product is the key of the map, and the price is the value.
Let’s change in get_prices/1
the function passed to Enum.map
, returning a tuple with both product_id and price. We can then use Enum.into/2
to convert the list of 2-element tuples to a Map.
def get_prices(products) do
products
|> Enum.map(fn product_id ->
{product_id, get_price(product_id)}
end)
|> Enum.into(%{})
end
And this time it returns a human readable result
iex> Coinbase.Client.get_prices(products)
%{
"BCH-USD" => 141.66,
"BTC-USD" => 3895.99,
"ETH-USD" => 136.05,
...
}
Concurrent
We saw that spawn
runs the given function in a new process, returning immediately. We were able to make our requests concurrent. But, without messages we were only able to print the result.
We are now making the get_prices/1
concurrent without changing the function result. The output will be still a map with products and prices.
Let’s start focusing on making one single concurrent request, then scaling to multiple requests will be easy.
def spawn_and_send_price(product_id, dst_pid) do
spawn fn ->
price = get_price(product_id)
send dst_pid, {self(), {product_id, price}}
end
end
spawn_and_send_price/2
creates a new process and returns immediately its pid. As we saw in the previous article, the way we receive the result back is using messages. For this reason we pass dst_pid
as second parameter, which is the pid where we want to receive the result back in form of a tuple.
{from_pid, {product_id, price}}
When we send the message back to dst_pid
, we pass self()
as first element of the tuple.self()
returns the pid of the process where is called. In that case the pid is the one of the spawned process.
It’s always good to send some reference along with the message, so it’s easier for the receiver to understand what the message is about and from who is coming. Passing the pid is also useful to filter messages, in the receive
block, coming just from the chosen process.
Let’s try this function on iex
.
iex> self
#PID<0.205.0>
iex> pid = Coinbase.Client.spawn_and_send_price("BTC-USD",self())
#PID<0.208.0>
iex> receive do
...> {^pid, result} -> result
...> end
{"BTC-USD", 3899.98}
Nice, we can define await/1
with exactly the receive
block we’ve just used.
def await(pid) do
receive do
{^pid, result} -> result
end
end
await(pid)
waits to receive
, and returns, the result sent from the given pid
.
We now have everything to write a compact concurrent get_prices(products)
function, using Elixir pipes.
def get_prices(products) do
products
|> Enum.map(fn product ->
spawn_and_send_price(product, self())
end)
|> Enum.map(&await/1)
|> Enum.into(%{})
end
- The first
Enum.map
runsspawn_and_send_price(product_id, dst_pid)
for each product, returning a list of pids. - The second
Enum.map
takes the list of pids as input, runningawait(pid)
for each pid. It then returns the list of results. Enum.into
transforms the list of tuples[{"BTC-USD",3902.0}, {"ETH-USD", 135.98}, ..]
, in a Map.
Let’s try it while measuring the time on iex.
iex> measure_time fn ->
...> Coinbase.Client.get_prices(products)
...> end
time 0.421s
%{
"BTC-USD" => 3902.0,
"ETH-USD" => 135.98,
"LTC-USD" => 58.08,
"BCH-USD" => 142.52,
"XRP-USD" => 0.3133,
"XLM-USD" => 0.106001,
"ZRX-USD" => 0.269627
}
Fantastic, only 0.421 seconds. We see how this version is faster than the other one which took 2.132 seconds.
Task
We’ve built our concurrent function using directly spawn
, send
and receive
. This was useful to understand how concurrency is handled in Elixir, but usually it’s much better to use modules like Task, which makes concurrency much easier.
Conveniences for spawning and awaiting tasks.
Tasks are processes meant to execute one particular action throughout their lifetime, often with little or no communication with other processes.
Elixir Task
We refactor get_prices(products)
, getting rid of the two functions we wrote, spawn_and_send_price(product_id,dst_pid)
and await(pid)
. We now use Task.async
and Task.await
def get_prices(products, :task) do
products
|> Enum.map(&Task.async(fn -> {&1, get_price(&1)} end))
|> Enum.map(&Task.await/1)
|> Enum.into(%{})
end
The dynamic is really similar of the one we saw in our custom version. If we run just Task.async
, we see that the messages are sent to our current process.
iex> products \
...> |> Enum.map(&Task.async(\
...> fn ->
...> {&1, Coinbase.Client.get_price(&1)}
...> end))
[%Task{}, %Task{}, ...]
iex> iex(3)> :erlang.process_info self(), :messages
{:messages,
[
{#Reference<...>, {"ETH-USD", 139.63}},
{#Reference<...>, {"LTC-USD", 60.52}},
{#Reference<...>, {"BCH-USD", 152.76}},
...
]}
Task.async
returns a Task struct instead of a pid, which will be used by Task.await
to get the result from the iex process’ mailbox.
For the most passionate, a small challenge for you!
After playing around with the code above, try to remove the last step Enum.into
into get_prices
. The function will then return a list of tuples {product, price}
.
Try to get the prices multiple times! Do you see anything particular? The requests we make are concurrent, each one takes a random and different amount of time, but the results are always in the same order. Do you know why?
Any ideas on how to put the list of results in order from quickest request to the slowest?
Feel free to answer below, in the comments section! I look forward for your suggestions!
Credits
A special thanks to Greg Vaughn, who helped me give a perfect title to this article!