The code in this article is heavily inspired from the concepts amazingly explained in the book Designing Data-Intensive Applications by Martin Kleppmann.
Disclaimer, all the code you find in this article, and on the github repo, is written for pure fun and meant just as an experiment.
Published articles in the series:
- Part 1 (this article)
- Part 2
Introduction
In the last year I’ve got interested in logs and how something so simple can be the solid foundation of databases like Riak, Cassandra and streaming systems like Kafka.
In this series of articles we will see the different concepts behind a key-values store (Logs, Segments, Compaction, Memtable, SSTable) implementing a simple engine in Elixir, which is a great language to build highly-concurrent and fault-tolerant architectures.
In this first part we will see:
- What is a log?
- Making a KV persistent using a log and an index. Using an example we will use along the series, crypto market prices and trades, we are going to see how to store the values in a log using using an index.
- LogKV in Elixir. A initial super simple implementation in elixir of a Writer, an Index and a Reader.
What is a log?
Let’s think at the most common type of log file, the one we use everyday to debug events and error messages in our applications. This simple file enshrines an interesting property, it’s an append-only file. This means that only sequential writes are allowed and everything we write in the log is immutable.
Why sequential writes could be important for us? Well… speed!
We see the huge difference between random and sequential access on both classic magnetic disks, SSD and even memory. So, the idea is to leverage the sequential access speed using an append-only file to save the data of our key-value store.
Using a Log to implement a simple KV store
Let’s start with a simple example. Let’s consider a realtime application where we have to store the last price in dollars of the Bitcoin (BTC), Ethereum (ETH) and Litecoin (LTC).
Key | Value |
---|---|
BTC | 4478.12 |
ETH | 133.62 |
LTC | 33.19 |
If we just need to keep this snapshot in memory, in Elixir we can use a Map. But persistence is another story. There are many different ways and technologies we could use to store this map and make it persistent.
If this snapshot would be updated just few times in a day, with just few currencies, then serialising the map into a file would be fine and easy to do, but this is obviously not our case! Our imaginary crypto application needs to keep track of any market’s movement with hundreds of updates per second for hundreds of currencies.
But how can we use an append-only file, where data written is immutable by nature, to store the mutable data of a key-value store, to leverage sequential access and to keep our map persistent?
The idea is pretty simple:
- append to our log each single price update (value) for any currency (key)
- use our
Map
as an index, keeping track of the position and size of the values within our logfile.
- 17:14:59 – LTC trades at 32.85$. We append the string
"32.85"
to the log and we update the"LTC"
key of our index (implemented with a Map) with value’s offset (0 since it’s the first value in the file) and it’s size (5 bytes, since it’s a string). - 17:15:00 – ETH trades at 130.98$. We append the string
"130.98"
to the log and we update the"ETH"
key of our index with offset 5 and size 6 bytes. - 17:15:01 – BTC trades at 4411.99$. We append the string
"4411.99"
to the log and we update the"BTC"
key of our index with offset 11 and size 7 bytes.
What happens if we receive a new price for ETH? How can we overwrite the value in the log since the value we wrote is immutable and we can just append?
- 17:15:09 – ETH trades at 131.00$.
Since to leverage sequential writes we can just append, we then just append the new value updating the index with the new offset and size.
The read is efficient too. To retrieve the values from the log, we just use offset and size in the index and with need one seek of the disk to load our value into memory.
LogKV in Elixir
We are going to write three different Elixir modules.
The Index holds the offset and size for all the keys. It can receive an update request from the Writer and a lookup request from the Reader to get offset and size.
The Writer can only append values to the log and send update requests to the Index. When we want to set a new value for a specific key, we will send a message to the writer.
The Reader can read values from the log, asking offset and size informations to the Index. When we want to lookup for a key we will send a message to the reader.
Some considerations about concurrency
We have only one writer appending to the log file, since having multiple parallel writers would just lead to data corruption.
We can have multiple readers for each log file, since this doesn’t lead to any concurrency problem. The reading operation does not need any lock and doesn’t block the writer, since the log it’s append only and the values written is immutable.
LogKV.Index
The index is implemented as a GenServer. Its state is a Map where the keys of the map are the keys we want to store (like "BTC"
, "ETH"
, "LTC"
etc..) and the values are tuples { offset, size }
.
Initialisation of the index
defmodule LogKV.Index do
use GenServer
def start_link([]) do
GenServer.start_link(__MODULE__, :empty, name: __MODULE__)
end
def init(:empty), do: {:ok, %{}}
end
At first we just need to start the index with an empty state, which is an empty Map. We need only one index, so specifying the :name
we are actually forcing to be able to start just one index named LogKV.Index
.
Update
def update(key, offset, size) do
GenServer.call(__MODULE__, {:update, key, offset, size})
end
def handle_call({:update, key, offset, size}, _from, index_map) do
{:reply, :ok, Map.put(index_map, key, {offset, size})}
end
The update/3
function sends a synchronous request (a call) to the LogKV.Index
process to update the {offset, size}
tuple for that specific key, where the offset
and size
are the two values the LogKV.Reader
needs to get the value from the log.
We could make it asynchronous with a cast
, so the Writer
doesn’t have to wait a reply, but in this way we have the problem that we don’t know when the Index
will have the key updated and that we don’t know when the Reader
will be able to have access to the updated data. In few words, data consistency issue: we write, asynchronous with a cast, and the reader still reads old data.
Lookup
def lookup(key) do
GenServer.call(__MODULE__, {:lookup, key})
end
def handle_call({:lookup, key}, _from, index_map) do
{:reply, get_key_offset_size(key, index_map), index_map}
end
defp get_key_offset_size(key, index_map) do
case Map.get(index_map, key) do
{_offset, _size} = offset_size -> {:ok, offset_size}
nil -> {:error, :not_found}
end
end
The lookup/1
function returns
{:ok, {offset, size} }
if the key exists{:error, :not_found }
if the key doesn’t exist
I prefer to keep the handle_call
light, moving the logic to a seperate private function, get_key_offset_size/2
.
You can find a complete version of LogKV.Index
module, with doctests, on the LogKV github repo: index.ex
Let’s test it on iex
iex> LogKV.Index.start_link([])
{:ok, #PID<0.176.0>}
iex> LogKV.Index.start_link([])
{:error, {:already_started, #PID<0.176.0>}}
We need first to start the Index
. We see that we can only have one index running, since the name is fixed.
iex> LogKV.Index.update("btc",0,10)
:ok
then we update the offset and size of a key
iex> LogKV.Index.lookup("btc")
{:ok, {0, 10}}
retrieve offset and size data from the index
iex> LogKV.Index.lookup("ltc")
{:error, :not_found}
and get a :not_found
error when the index doesn’t have the key.
LogKV.Writer
The Writer has the responsibility of:
- creating/opening the log file during the initialisation process
- append values and update the index accordingly
Initialisation
defmodule LogKV.Writer do
use GenServer
def start_link(log_path) do
GenServer.start_link(__MODULE__, log_path, name: __MODULE__)
end
def init(log_path) do
fd = File.open!(log_path, [:write, :binary])
{:ok, %{fd: fd, current_offset: 0}}
end
end
Since for this implementation we need just one writer, we force for semplicity the name to LogKV.Writer
during the start of the GenServer
, like we did for LogKV.Index
.
The init/1
opens the file, saving the current_offset
and file’s pid
into the writer’s state. The current offset is needed to know where we are inside the log file, so we can update the index with the correct absolute offset.
To make everything simple for this first implementation, there is no particular error handling here. If there is an issue with the file opening, an exception is raised.
Put
def put(key, value) do
GenServer.call(__MODULE__, {:put, key, value})
end
def handle_call(
{:put, key, value},
_from,
%{fd: fd, current_offset: current_offset} = state) do
:ok = IO.binwrite(fd, value)
size = byte_size(value)
LogKV.Index.update(key, current_offset, size)
new_state = %{state | current_offset: current_offset + size}
{:reply, {:ok, {current_offset, size}}, new_state}
end
The LogKV.Writer.put/2
function, sends a message to the LogKV.Writer
process which appends the value to the log file IO.binwrite(fd, value)
. Then it updates the index using the current_offset
to know where the value
is within the log file, and the value’s size
to know how many bytes we need to read from the file.
We then update the writer’s state, setting the new current_offset
.
In this way we make the values persistent. The index is not persistent though, since it’s state is only kept in memory. We will see in the second part (next week) how can we simply store the index metadata along the values in the logfile, making the index recoverable.
Full code at this link: writer.ex
Let’s try the writer on iex
iex> LogKV.Index.start_link([])
iex> LogKV.Writer.start_link("test.db")
{:ok, #PID<0.197.0>}
After the Index we start the LogKV.Writer
process, passing the path of our log-file , test.db
in this case. You’ll see that the test.db file is created once you start the Writer process, where you’ve started the iex session.
iex> LogKV.Writer.put("ltc","32.85")
{:ok, {0, 5}}
We save the string "32.85"
for the key "ltc"
. Let’s add few more keys
iex> LogKV.Writer.put("eth","130.98")
{:ok, {5, 6}}
iex> LogKV.Writer.put("btc","4411.99")
{:ok, {11, 7}}
iex> LogKV.Index.lookup("btc")
{:ok, {11, 7}}
You can see how the offset increases and the size is the number of bytes of the value and we see how the Index is updated for us from the Writer.
If we check what’s inside the file test.db
, this is what we see
$ cat test.db
32.85130.984411.99
which is the result of the appends of the values. We are able to distinguish them thanks to the data in the Index.
If we update a key, the new value is appended to the file and the old value remains at the beginning of the log file.
iex> LogKV.Writer.put("eth","131.00")
{:ok, {18, 6}}
$ cat test.db
32.85130.984411.99131.00
the new offset is 18
, this means the Index now points to the new value.
LogKV.Reader
The Reader’s aim is to pull the values from the log-file, using the Index. Since the Reader opens the file just in read mode, and that the data appended in the log-file is immutable, we can start many of them concurrently, providing values in parallel.
Initialisation
defmodule LogKV.Reader do
use GenServer
def start_link(log_path) do
GenServer.start_link(__MODULE__, log_path)
end
def init(log_path) do
fd = File.open!(log_path, [:read, :binary])
{:ok, %{fd: fd}}
end
end
The reader doesn’t have a fixed name because we can have multiple readers for one single log-file. The file is opened during initialisation and the file pid is kept in the state.
Get – Reading values from the log
def get(pid, key) do
GenServer.call(pid, {:get, key})
end
def handle_call({:get, key}, _from, %{fd: fd} = state) do
case LogKV.Index.lookup(key) do
{:ok, {offset, size}} ->
{:reply, :file.pread(fd, offset, size), state}
{:error, _} = error ->
{:reply, error, state}
end
end
get/2
sends a call
message to the Reader’s process pid
.
If the key doesn’t exist in the Index we just proxy the error, which will be {:error, :not_found}
.
If the key is found, we use the erlang function :file.pread/3
which seeks the file descriptor at the offset
location and reads size
number of bytes.
Full code at this link: reader.ex
Let’s try the Reader on iex. We need first start the Index and the Writer. Let’ assume to have the Index and the Writer of the previous example.
iex> {:ok, pid} = LogKV.Reader.start_link "test.db"
iex> LogKV.Reader.get(pid, "btc")
{:ok, "4411.99"}
It seems to work great. Let’s try to get the value of eth
key, which has an old and new value in the log.
iex> LogKV.Reader.get(pid, "eth")
{:ok, "131.00"}
Good, we see how it gets the correct value, the latest one we’ve put.
GitHub Repo
At this commit you find the full working code of this article’s implementation.
Wrap up
We learned how to leverage the sequential writes implementing an Index, a Writer and a Reader. This is a super-simple implementation, but it helped us to introduce conceptually the topic. There are obviously different issues like
- The index is kept in memory. Memory then limits the number of keys we can have in our storage engine.
- If our storage engine crashes, we loose the index (which is only in memory) without being able to recover the data. This can be fixed appending the keys along with the value. In this way we are able to recover the index scanning the log file.
- The log grows indefinitely keeping the old values. We need to first put a cap to the log size and to get rid of the old values. This leads to important concepts like segments and compaction.
In the next parts we will dig into these issues expanding the implementation of our storage engine.