We need to process a large CSV file of minute by minute volume and prices. Our task is pretty simple: we just want to get the first line of the year 2015, with valid data. At first, this seems an easy task we could tackle String.split
Enum.map
/filter
/find
. But what happens when the CSV file is large?
Let’s see how with Elixir Streams we can elegantly manage large files and create composable processing pipelines.
Getting a large CSV from Kaggle
We need at first a real and large CSV file to process and Kaggle is a great place where we can find this kind of data to play with. To download the CSV file just go to the Kaggle Bitcoin Historical Data
The CSV is the historical Bitstamp BTC-USD prices and volumes aggregated with 1-minute interval. Once unzipped, the size is around 220Mbyte and it has 8 columns and 3.6 million rows.
We will focus on the first two columns, Timestamp
Open
NaN
Timestamp
to a DateTime
struct, we then find the first row of
The greedy approach
File.read!("large.csv")
|> String.split("\n")
|> Enum.map(&String.split(&1, ","))
|> Enum.filter(fn
["Timestamp" | _] -> false
[_, "NaN" | _] -> false
_ -> true
end)
|> Enum.find(fn
[timestamp | _] ->
dt =
timestamp
|> String.to_integer()
|> DateTime.from_unix!()
dt.year == 2015
end)
|> IO.inspect()
# RESULT
["1420070400", "321", ...]
The first column is the timestamp, where 1420070400 means 1st Jan 2015 00:00.
It works.. but this approach hides big memory and processing issues. With the Erlang Observer we can easily see the total memory allocated by our process.
iex> :observer.start
Just processing a 220Mbyte CSV file, which is not even that big, we had a crazy 5.5GB peak of allocated memory.
Let’s inspect the code to understand what it does and how the data moves between each step.
line 1: File.read!("large.csv")
loads the whole CSV file into memory. Just in this first step, we allocate 220Mbyte. And that’s just the beginning.
line 2: String.split(text,"\n")
takes the loaded text and splits it into lines, creating a list of new strings representing the rows of the CSV.
line 3: Enum.map(rows, &String.split(&1, ","))
splits each single CSV row into a list of columns. The first column is the Timestamp, the second the Open price etc..
[
Timestamp,Open,High,Low,Close,...
...
["1420070400", "321", ...]
...
["1541888400","6359.96",..],
["1541888460","NaN",..],
...
]
line 4: We use Enum.filter/2
and pattern matching to filter out the header and rows with NaN
values in the Open column.
[
...
["1420070400", "321", ...]
...
["1541888400","6359.96",..],
["1541888520","6363.73",..],
...
]
line 9: Enum.find/2
loops over the whole list of mapped and filtered rows, returning the result once the condition is matched.
["1420070400", "321", ...]
We see how each one of these functions String.split
splits the whole text into lines, Enum.map
maps all the lines into columns etc… this is a huge waste of memory and processing resources.
Lazy Processing with Elixir Streams
We don’t need to load all the data in memory! We can actually try to load and process one line of text at a time. This is where Elixir Streams come into play!
Streams are composable, lazy enumerables
https://hexdocs.pm/elixir/Stream.html
Lazy means that when we use a Stream to process a CSV file, it doesn’t open and load the whole file. Instead, it reads one line at a time. We can compose complex pipelines with different processing steps where the stream reads and pipes one single line at a time, without having to process all the lines at every single step.
In the image above, we see how one single line is read and processed by the whole pipeline. Once the final step finishes to process it, a new line is then read and piped by the stream.
Instead of opening a file withFile.open
, we use the functionFile.stream!
to create a Stream.
iex> File.stream!("large.csv")
%File.Stream{
line_or_bytes: :line,
modes: [:raw, :read_ahead, :binary],
path: "large.csv",
raw: true
}
File.stream!("large.csv")
returns a Stream without opening the file. We can use this stream to compose a pipeline using the Stream module functions.
iex> File.stream!("large.csv") |> Stream.map(&String.split(&1,","))
#Stream<[
enum: %File.Stream{
line_or_bytes: :line,
modes: [:raw, :read_ahead, :binary],
path: "large.csv",
raw: true
},
funs: [#Function<48.51129937/1 in Stream.map/2>]
]>
Instead of usingEnum.map
, we Stream.map
funs
property which is a list of functions that will be applied to each row. At the moment there is no processing, we are only composing our pipeline.
To run our stream, we need to use a function that actually enumerate the stream, like Enum.count
/take
/find
/map
/filter
etc..
iex> File.stream!("large.csv") |> Enum.count()
3603137
As soon as Enum.count
tries to loop through the stream, the stream opens the file and starts reading and passing the lines to Enum.count
. If you look at the Erlang Observer, you’ll see that there is almost no memory peak, since the Enum.count
function counts the lines one at the time
Compose our pipeline with streams
It’s now time to build our processing pipeline using Streams, instead of just Enum functions. You’ll see that the code will seem similar, but the way data flows between functions is quite different.
Let’s write the first part of the pipeline.
File.stream!("large.csv")
|> Stream.map(&String.trim(&1))
|> Stream.map(&String.split(&1, ","))
|> Stream.filter(fn
["Timestamp" | _] -> false
[_, "NaN" | _] -> false
[timestamp | _] ->
IO.puts("filter -> #{timestamp}")
true
end)
#Stream<[
...
funs: [#Function<48.51129937/1 in Stream.map/2>,
#Function<48.51129937/1 in Stream.map/2>,
#Function<40.51129937/1 in Stream.filter/2>]
]>
This first part of the pipeline returns a stream. The functions we pass to map
and filter
are the same as the initial example, and they are saved inside the stream processing pipeline. The first two map
are where we trim each line and split it into columns. The third is where filter
NaN
|> Enum.find(fn
[timestamp | _] ->
ts = String.to_integer(timestamp)
dt = DateTime.from_unix!(ts)
IO.puts("find -> #{timestamp} - #{dt.year}")
dt.year == 2015
end)
This is the final step, the one that actually runs the pipeline. The function we pass to Enum.find
transforms the timestamp
to a DateTime
struct and returns true
when it finds that the year is 2015.Enum.find
receives from the stream only the rows that are already filtered by the step before and once the condition dt.year == 2015
is met, it stops and returns the item.
filter -> 1370321820
find -> 1370321820 - 2013
filter -> 1370321880
find -> 1370321880 - 2013
...
filter -> 1420070340
find -> 1420070340 - 2014
filter -> 1420070400
find -> 1420070400 - 2015
["1420070400", "321", "321", "321",...]
Looking at the logs we’ve put in filter
find
Enum.find
Benchmarking with Benchee
Let’s compare the greedy and lazy approaches. We benchmark their speed with benchee while we monitor the memory footprint with the Erlang Observer.
We see that the lazy approach has not only tremendous implications into memory consumption, but it also makes the code much faster compared to the greedy version.