Cryptocurrency exchanges usually open their realtime feed for free and, like Coinbase Pro, without even having to create an account. This gives us a great way to build an architecture around realtime market data.
In this article we see how to build an Elixir application to get realtime updates from the coinbase websocket feed, handling crashes and disconnections.
Realtime Crypto Market Data
Coinbase is one of the most known and used exchange for trading crypto currencies. As many other crypto exchanges, they provide a realtime feed to download all the different events from the exchange. If we register to the service, we see a trading view like this below
We mainly focus on the stream of new trades at the bottom-right, in the Trade History column, where it shows the trades of the last few minutes in descending order. We see the last trade price on the top-bar is $3356.01, which reflects the trade at the first row happened at 12:36:30. The data we want to download, at the moment, is just the stream of trades.
Let’s take a look at the documentation of the websocket-feed.
We need first to connect to the websocket service, using the wss://ws-feed.pro.coinbase.com URI.
Once connected we just need to send a subscription JSON message like this.
{
"type": "subscribe",
"product_ids": [
"BTC-USD"
],
"channels": [
"matches"
]
}
There are many channels: level2, ticker, heartbeat, matches etc.. We focus just on the matches channel, which represents the stream of trades. In this case we subscribe for the “BTC-USD” product, but we could subscribe to many different products at the same time.
WebSocket Connection
Let’s create a new Elixir application called coinbase, with --sup
option which creates an OTP application skeleton for us, including a supervision tree. You can find the full code at this GitHub repo.
$ mix new coinbase --sup
We need now a websocket client. I’ve used WebSockex in the last few months which, as of now, it’s actively maintained. Let’s add it to our dependencies in the mix.exs
file, along with the Jason JSON library. We will need Jason to encode and decode JSON strings.
#coinbase/mix.ex
def deps do
[
{:websockex, "~> 0.4.2"},
{:jason, "~> 1.1"},
]
end
And run the deps.get
command to download the dependencies.
$ mix deps.get
Resolving Hex dependencies...
Dependency resolution completed:
Unchanged:
jason 1.1
websockex 0.4.2
* Getting websockex (Hex package)
* Getting jason (Hex package)
Great, we can now start building our Coinbase client using Websockex.
Let’s focus, at first, on the client’s connection
defmodule Coinbase.Client do
use WebSockex
@url "wss://ws-feed.pro.coinbase.com"
def start_link(product_ids \\ []) do
WebSockex.start_link(@url, __MODULE__, :no_state)
end
def handle_connect(conn, state) do
IO.puts "Connected!"
{:ok, state}
end
end
We’ve created a Coinbase.Client
module that simply connects to the Coinbase server.
- line 2: we’ve used
use WebSockex
to inject WebSockex functionalities in our module - line 7:
WebSockex.start_link/3
starts a websocket client in a seperate process and returns{:ok, pid}
tuple. - line 10: the
handle_connect/2
callback is called when the client is connected.
Let’s run it in the Elixir’s interactive shell:
$ iex -S mix
iex> {:ok, _pid} = Coinbase.Client.start_link []
Connected!
{:ok, #PID<0.393.0>}
Subscription Frame
Our module connects correctly to the Coinbase websocket server. To be able to receive the trades through the websocket connection, we need to subscribe to the matches channel.
def subscribtion_frame(products) do
subscription_msg = %{
type: "subscribe",
product_ids: products,
channels: ["matches"]
} |> Jason.encode!()
{:text, subscription_msg}
end
The subscription_frame/1
function returns a frame ready to be sent to the server. A frame is a tuple {type, data}
, where in our case type is :text
and data
is a JSON string.
The products
argument is a list of Coinbase product ids, which are strings like "BTC-USD"
, "ETH-USD"
, "LTC-USD"
.
We build the subscription message using a Map with type
, product_ids
and channels
keys. This map is then converted to a JSON string using the Jason.encode!/1
function. The returned frame looks like this
{:text, "{\"type\":\"subscribe\",\"product_ids\":[\"BTC-USD\"],\"channels\":[\"matches\"]}" }
We then add a subscribe
function to our module, which we will use once connected.
def subscribe(pid, products) do
WebSockex.send_frame pid, subscribtion_frame(products)
end
The subscribe/2
function builds the subscription frame {:text, json_msg}
using the subscription_frame(products)
we’ve just seen and sends the frame to the server using the WebSockex.send_frame(pid, frame)
function, where the pid is the websocket client process id.
Handling WebSocket Frames
Once subscribed, the Coinbase server starts sending us messages in JSON format. We need now to implement the callback handle_frame
which is called when the client receives data from the server.
def handle_frame(_frame={:text, msg}, state) do
msg
|> Jason.decode!()
|> IO.inspect()
{:ok, state}
end
We’ve just implemented a simple handle_frame(frame, state)
function where we
- pattern match the frame, extracting the JSON message string
- decode it using
Jason.decode!/1
, which converts the JSON string to a Map - print the map to the standard output
The callback returns a {:ok, new_state}
tuple, useful if we need to update our state (like handle_cast/2
in GenServer).
We are ready to test our client on iex and manually connect and subscribe, hoping to see some trades flowing in.
iex> products = ["BTC-USD"]
iex> {:ok, pid} = Coinbase.Client.start_link products
connected!
{:ok, #PID<0.200.0>}
iex> Coinbase.Client.subscribe pid, products
%{
....
"price" => "3562.91000000", "product_id" => "BTC-USD",
"side" => "sell", "size" => "0.01341129",
"time" => "2018-12-18T14:43:13.254000Z","trade_id" => 56138234,
"type" => "last_match",
}
%{
"channels" => [%{"name" => "matches", "product_ids" => ["BTC-USD"]}],
"type" => "subscriptions"
}
%{
"maker_order_id" => "......",
"price" => "3562.91000000",
"product_id" => "BTC-USD",
"sequence" => .....,
"side" => "sell",
"size" => "0.13762144",
"taker_order_id" => ".....",
"time" => "2018-12-18T14:43:15.177000Z",
"trade_id" => 56138235,
"type" => "match"
}
...
It’s working! We are receiving data from the server.
The first message is always a "type" => "last_match"
, which is necessary when we have a recovery process that downloads the missed trades after a disconnection (beyond the scope of this article). There is also a confirmation of our subscription "type" => "subscriptions"
. But what we are really interested about are the "type" => "match"
messages, which are the live trades.
To now make the subscription automatic, we need to change the Coinbase.Client.start_link
function
def start_link(products \\ []) do
{:ok, pid} = WebSockex.start_link(@url, __MODULE__, :no_state)
subscribe(pid, products)
{:ok, pid}
end
In this way we start the WebSockex process with WebSockex.start_link/3
, which connects to the server and returns the pid
. We then use this pid
to subscribe, calling the subscription function implemented before, and then let the function to return the {:ok, pid}
tuple.
Filtering the trades is really easy with pattern matching. We can’t pattern match the message directly in the handle_frame/2
function, since msg
is a JSON string and we need first to convert it into a Map, using Jason.decode!/1
. We define then a new function handle_msg/2
.
def handle_frame({:text, msg}, state) do
handle_msg(Jason.decode!(msg), state)
end
def handle_msg(%{"type" => "match"}=trade, state) do
IO.inspect(trade)
{:ok, state}
end
def handle_msg(_, state), do: {:ok, state}
The first handle_msg/2
function imposes that the "type" => "match"
and is called only when this condition is true.
The second handle_msg(_,state)
works as a catch all, which ignore the messages returning {:ok, state}
.
This works like a filter and it’s exactly the same as using case
def handle_msg(msg, state) do
case msg do
%{"type" => "match"}=trade -> IO.inspect(trade)
_ -> :ignore
end
{:ok, state}
end
Let’s go back to iex and see now what happens just starting our Coinbase Client
iex> Coinbase.Client.start_link ["BTC-USD"]
connected!
{:ok, #PID<0.184.0>}
%{
"maker_order_id" => "9050ea18-440f-442e-9129-358d77351685",
"price" => "3531.42000000",
"product_id" => "BTC-USD",
"sequence" => 7584073701,
"side" => "sell",
"size" => "0.04000084",
"taker_order_id" => "e5df8af2-20d0-4592-af98-e41bbb2ed8a9",
"time" => "2018-12-18T15:20:01.237000Z",
"trade_id" => 56140459,
"type" => "match"
}
Great, it works!
You can find the full code at this link: client.ex
Elixir Supervision
We can monitor disconnections implementing the handle_disconnect(conn, state)
callback
def handle_disconnect(_conn, state) do
IO.puts "disconnected"
{:reconnect, state}
end
Returning {:reconnect, state}
we ask to WebSockex to reconnect using the same process. Unfortunately this solution isn’t the best in our case because we have the subscription process inside the Coinbase.Client.start_link
function, which is not called when the connection is restarted.
We opt then to let the process exit, returning :ok
instead of :reconnect
.
def handle_disconnect(_conn, state) do
IO.puts "disconnected"
{:ok, state}
end
It’s easy to test a disconnection. Just switching off the wifi (or disconnecting the ethernet cable), inducing the client to fire a timeout error.
iex> Coinbase.Client.start_link ["BTC-USD"]
...
%{"type" => "match", ...}
%{"type" => "match", ...}
...
# switch off the connection and wait
15:33:51.839 [error] [83, 83, 76, 58, 32, 83, 111, 99, 107, 101, 116, 32, 101, 114, 114, 111, 114, 58, 32, 'etimedout', 32, 10]
disconnected
** (EXIT from #PID<0.182.0>) shell process exited with reason: {:remote, :closed}
iex>
Since we’ve used the WebSockex.start_link
function, the websockex process was linked to our iex
process which is taken down after the error.
To cope with client’s disconnections and crashes, we need to add some basic supervision. When we created the project, we’ve passed the --sup
option to the mix command. This created a supervision skeleton we can now use.
Let’s open the application.ex file and add the processes we want to monitor.
defmodule Coinbase.Application do
use Application
def start(_type, _args) do
children = [
{Coinbase.Client, ["BTC-USD"]}
]
opts = [strategy: :one_for_one, name: Coinbase.Supervisor]
Supervisor.start_link(children, opts)
end
end
At line 6, we’ve added the {Coinbase.Client, products}
tuple. The Supervisor will then start, running and monitoring our process, started with Coinbase.Client.start_link ["BTC-USD"]
. In the case of a disconnection (or a crash) it will start a new client.
Running the Application
$ mix run --no-halt
connected!
%{
...
"price" => "3521.20000000",
...
"type" => "match"
}
$ iex -S mix
...
connected!
Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
iex> %{
...
"price" => "3522.85000000",
...
"type" => "match"
}
In both cases the application starts, running the Coinbase.Supervisor
which, in turn, starts and monitors the Coinbase.Client
.
What happens if we kill the Coinbase.Client process? We can get the pid
from the list of processes monitored by the Coinbase.Supervisor
and kill it using the Process.exit(pid, reason)
function.
[{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)
Process.exit(pid, :kill)
Let’s see it in action
$ iex -S mix
...
connected!
...
%{ ..., "price" => "3522.85000000", "type" => "match"}
...
iex> [{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)
Process.exit(pid, :kill)
true
iex(6)> connected!
%{ ..., "price" => "3535.00000000", "type" => "match"}
We see that after we kill the client process, the supervisor starts a new client that connects immediately to the Coinbase server.
Wrap Up
This is initial and simple implementation of a supervised Coinbase client we can use to start processing the trades’ stream.
We’ve implemented a basic supervision, which is great if you don’t mind to loose some trades after the disconnections, but is not enough if you want a proper fault-tolerant client, that’s able to recover lost data. After a disconnection occurs -and before the client is reconnected and ready to receive trades again- there could have been other trades. In that case we would have lost them.
A solution, to properly recover the lost trades, is to check the id of the last trade and then download the lost trades using a different API, like the Get Trades in Coinbase.