Streaming an avalanche of data with Go’s io package
In Go, using?io?package (io.Pipe(),?io.Reader/WriteCloser) to stream data could avoid reading all data into the memory.
If you prefer reading code rather than my babbling, here’s a?gist with revisions, and the playground link?before?and?after?the refactor. (but the last part?The Final Fine-tuning?is not covered)
One fine day I started working on a task to extract a bunch of data based on user-specified filter, transform that into CSV format and upload it to a network location.
“That’s rather typical,” thought I naively. So here I went:
Nice, modular and testable. (See?Playground link?for a demo) What could go wrong?
Enter load testing…
This was just a few millions of records, and it’s Xendit we’re talking about here, one single request could easily hit that number, this wouldn’t work.
Solutioning
Let’s identify the root cause of this RAM spike.
Apparently, by the interface?getData(context.Context, filter) ([]item, error), I was storing all?items in the RAM. To reduce the RAM usage, I needed to find a way to stream the data from DB directly to the network location.
io Package to the Rescue
Taking a closer look, the upload SDK takes an?io.Reader?as input, like most other libraries in Go involving input/output. Therefore,
should be refactored as
Now I have a Reader, how do I write to it? Go’s?io?package has a function?Pipe()?that returns a connected pair of Reader (PipeReader) and WriteCloser (PipeWriter).
Hence, I should create the pair, pass the reader to the upload interface, and use another goroutine to perform the writing (if I don’t use a goroutine, the reader and writer will block each other).
To perform the writing, I need to poll the database result, and write to it. So the main logic becomes
The datastore interface would return the rows now:
It worked! (See?Playground link?for an updated demo)
Most of the allocations came from the upload/reader part. To put a cherry on top, the time consumption was cut in half because the read and write were performed concurrently, albeit still being O(n).
Package Boundary
That’s not the end of the story! In reality, I separated the DB access in another package, in hope of reusing it, this meant the package would expose?*sql.Rows?(actually I’m using?sqlx, so?*sqlx.Rows), making it tightly coupled to database or even a specific DB library.
To address this, I decided to communicate with a channel of?items instead. So now the DB interface changed from
领英推荐
to
And the CSV writer part was updated accordingly, see?Playground link?for the updated demo.
Let’s check the performance again!
Yup, almost overlapping red and yellow lines indicated that the refactor didn’t burden the memory much. Time shouldn’t be impacted as I didn’t…
Wait… What? The refactor (in yellow) slowed it down even worse than the original solution. We’ve got a new problem.
The Final Fine-tuning
Why was it slowed down? Time to pull out the big gun — pprof.
I added the following lines to capture the CPU profile:
(See?the gist?for the final code.) Before the refactor (where?*rows?is returned), the visualization of pprof partly looked like this:
After that (where a channel is used), it looked like this:
There’s a significant amount of time spent on waiting, by?this great article,
Each channel internally holds a mutex lock which is used to avoid data races in all kinds of operations.
I thought buffering the channel might help, and it did.
Buffered channels (green/orange line) consumed less time than the original solution (blue line), but still more than the solution where?*rows?was used (red line), and the size of the buffer hardly mattered beyond a certain threshold, because the upload part was able to drain the items.
The memory consumption shouldn’t be impacted much as I didn’t…
No more surprises. All good now.
As a last thought, I think the use of channel is not well justified as the only reason is that I want to separate the DB access part, which, on a second thought, might not be reusable anyway because the filter is quite specific to this use case. Hence, I decided to merge the DB access with the main logic and use the?*rows?directly.
In summary, when a large amount of data is flowing out of your Go system:
This way you wouldn’t need to hold all data in RAM.
Thank you for reading and I hope this helps you!