Streaming an avalanche of data with Go’s io package

Streaming an avalanche of data with Go’s io package

In Go, using?io?package (io.Pipe(),?io.Reader/WriteCloser) to stream data could avoid reading all data into the memory.
If you prefer reading code rather than my babbling, here’s a?gist with revisions, and the playground link?before?and?after?the refactor. (but the last part?The Final Fine-tuning?is not covered)

One fine day I started working on a task to extract a bunch of data based on user-specified filter, transform that into CSV format and upload it to a network location.

“That’s rather typical,” thought I naively. So here I went:

No alt text provided for this image

Nice, modular and testable. (See?Playground link?for a demo) What could go wrong?

Enter load testing…

memory consumption at load testing
No alt text provided for this image

This was just a few millions of records, and it’s Xendit we’re talking about here, one single request could easily hit that number, this wouldn’t work.

Solutioning

Let’s identify the root cause of this RAM spike.

Apparently, by the interface?getData(context.Context, filter) ([]item, error), I was storing all?items in the RAM. To reduce the RAM usage, I needed to find a way to stream the data from DB directly to the network location.

io Package to the Rescue

Taking a closer look, the upload SDK takes an?io.Reader?as input, like most other libraries in Go involving input/output. Therefore,

No alt text provided for this image

should be refactored as

No alt text provided for this image

Now I have a Reader, how do I write to it? Go’s?io?package has a function?Pipe()?that returns a connected pair of Reader (PipeReader) and WriteCloser (PipeWriter).

Hence, I should create the pair, pass the reader to the upload interface, and use another goroutine to perform the writing (if I don’t use a goroutine, the reader and writer will block each other).

To perform the writing, I need to poll the database result, and write to it. So the main logic becomes

No alt text provided for this image

The datastore interface would return the rows now:

No alt text provided for this image

It worked! (See?Playground link?for an updated demo)

Memory Consumption Comparison

Most of the allocations came from the upload/reader part. To put a cherry on top, the time consumption was cut in half because the read and write were performed concurrently, albeit still being O(n).

Time Consumption Comparison

Package Boundary

That’s not the end of the story! In reality, I separated the DB access in another package, in hope of reusing it, this meant the package would expose?*sql.Rows?(actually I’m using?sqlx, so?*sqlx.Rows), making it tightly coupled to database or even a specific DB library.

To address this, I decided to communicate with a channel of?items instead. So now the DB interface changed from

No alt text provided for this image

to

No alt text provided for this image

And the CSV writer part was updated accordingly, see?Playground link?for the updated demo.

Let’s check the performance again!

Memory Consumption Comparison with Refactor

Yup, almost overlapping red and yellow lines indicated that the refactor didn’t burden the memory much. Time shouldn’t be impacted as I didn’t…

Time Consumption Comparison with Refactor
No alt text provided for this image

Wait… What? The refactor (in yellow) slowed it down even worse than the original solution. We’ve got a new problem.

The Final Fine-tuning

Why was it slowed down? Time to pull out the big gun — pprof.

I added the following lines to capture the CPU profile:

No alt text provided for this image

(See?the gist?for the final code.) Before the refactor (where?*rows?is returned), the visualization of pprof partly looked like this:

CPU Profile when *rows is returned

After that (where a channel is used), it looked like this:

CPU Profile when channel is used

There’s a significant amount of time spent on waiting, by?this great article,

Each channel internally holds a mutex lock which is used to avoid data races in all kinds of operations.

I thought buffering the channel might help, and it did.

Time Consumption Comparison with Buffered Channel

Buffered channels (green/orange line) consumed less time than the original solution (blue line), but still more than the solution where?*rows?was used (red line), and the size of the buffer hardly mattered beyond a certain threshold, because the upload part was able to drain the items.

The memory consumption shouldn’t be impacted much as I didn’t…

Memory Consumption Comparison with Buffered Channel
No alt text provided for this image

No more surprises. All good now.

As a last thought, I think the use of channel is not well justified as the only reason is that I want to separate the DB access part, which, on a second thought, might not be reusable anyway because the filter is quite specific to this use case. Hence, I decided to merge the DB access with the main logic and use the?*rows?directly.

In summary, when a large amount of data is flowing out of your Go system:

  1. use?io.Pipe()?to create a pair of connected?io.Reader?and?io.WriteCloser?;
  2. pass the?io.Reader?to your data output component; (e.g., S3 SDK, Kafka library)
  3. write the data to the?io.WriteCloser?in another goroutine; (don’t forget to?Close?it at the end!)
  4. if necessary and feasible, use buffered channel to pass the data to the writer to facilitate loose coupling.

This way you wouldn’t need to hold all data in RAM.

Thank you for reading and I hope this helps you!

要查看或添加评论,请登录

Jason Lui的更多文章

社区洞察

其他会员也浏览了