Build Lightning-Fast APIs: Mastering Concurrent Pipelines in Go
Archit Agarwal
PMTS @ Oracle | Golang | Docker | Kubernetes| Typescript | Node.js | .NET | Angular | AWS Certified Cloud Practitioner | Educator | 2 Lac world wide rank on Leetcode
Picture this:
You’re building a batch update API to process 1,000 tasks in a single request. Each task needs to go through these steps:
Sounds simple, right? Well, here’s the catch: each step takes 100 milliseconds per task. Multiply that by 1,000 tasks across 4 steps, and voilà—you’re staring at over 6 minutes to process a single batch. And let’s be real, nobody’s waiting that long without refreshing the page in frustration (or questioning your API design).
Now, for the good news: Golang’s concurrency model can rescue you from this latency abyss. With Concurrent Pipelines, you can cut down processing time to just ~400 milliseconds “ideally”, no matter how large the batch size. Let’s dive into the magic behind this.
What Are Concurrent Pipelines?
In a concurrent pipeline, tasks don’t wait for all of Stage 1 to finish before moving to Stage 2. Instead, tasks flow through the pipeline like water in a series of interconnected pipes:
This overlap allows all stages to work simultaneously, maximizing your CPU’s resources while slashing overall processing time. It's like a factory assembly line where each worker does their job without waiting for the one before to finish everything.
Building a Concurrent Pipeline in Go
Here’s how you can implement a Concurrent Pipeline in Go using channels and goroutines.
Step 1: Define Your Tasks
First, we represent the update task request structure with PutTask:
type taskTypeEnum string
const (
taskTypeEpic taskTypeEnum = "Epic"
taskTypeStory taskTypeEnum = "Story"
taskTypeSubTask taskTypeEnum = "SubTask"
taskTypeTechTask taskTypeEnum = "TechTask"
)
type PutTask struct {
taskId int
toType taskTypeEnum
summary string
}
Step 2: Create the Pipeline Stages
In this section, we’ll break down the pipeline generator and its supporting components. The goal is to process tasks concurrently while ensuring smooth and efficient stage transitions. Let’s dive into the code.
Each stage processes tasks concurrently. Here's a reusable function for setting up a pipeline stage:
type f func(task PutTask) bool
func pipelineGenerator(done <-chan bool, readDataStream <-chan PutTask, failedDataStream chan<- PutTask, stageTaskProcesser f) <-chan PutTask {
stream := make(chan PutTask) // --- 1??
go func() {
pushWg := &sync.WaitGroup{}
defer close(stream)
for {
select {
case task, ok := <-readDataStream: // --- 2??
if !ok {
pushWg.Wait() // --- 3??
return
}
pushWg.Add(1)
go genericStageTaskProcessor(done, pushWg, task, stream, failedDataStream, stageTaskProcesser) // --- 4??
case <-done:
return
}
}
}()
return stream
}
Breaking It Down
Parameters:
What’s Happening Here?
1?? Creating the Stream: Each stage gets its stream to serve as input for the next stage in the pipeline.
2?? Reading Input Data: The readDataStream channel receives tasks from the previous stage. As long as data is flowing, the pipeline keeps processing.
3?? Waiting for Goroutines to Finish: When the input stream closes, we wait for all active Goroutines to complete processing before exiting.
4?? Concurrent Task Processing: For every incoming task, a new goroutine is spawned. This allows multiple tasks to be processed simultaneously, ensuring that slower tasks don’t block others.
The Helper Function: genericStageTaskProcessor
领英推荐
func genericStageTaskProcessor(done <-chan bool, wg *sync.WaitGroup, t PutTask, outputStream, failedDataStream chan<- PutTask, stageTaskProcesser f) {
defer wg.Done()
duration := time.Duration(100) * time.Millisecond
if t.taskId%5 == 0 {
duration = time.Duration(500) * time.Millisecond // --- 5??
}
// Simulating processing time
ticker := time.After(duration)
for {
select {
case <-done:
return
case <-ticker:
// Processing the task
if stageTaskProcesser(t) {
outputStream <- t // --- 6??
return
}
failedDataStream <- t
return
}
}
}
What’s Going On?
This function represents a worker who processes each task individually. Let’s break it down:
5?? Adding Processing Delays: To mimic real-world variability, some tasks (e.g., those where task % 5 == 0) are delayed longer than others. This helps simulate outliers in processing times.
6?? Pushing Results to Streams:
Key Highlights of the Pipeline
1?? Efficient Streaming: Each stage processes its input stream concurrently, without holding up subsequent tasks. If a task takes longer to process, the pipeline doesn’t block other tasks from advancing to the next stage.
2?? Error Handling: By separating failed tasks into the failedDataStream, we can handle them without affecting the flow of successful tasks.
3?? Concurrency Done Right: The use of sync.WaitGroup ensures all goroutines complete their work before shutting down.
Additionally, the done channel enables a clean and controlled shutdown of the pipeline.
Here’s how a single stage might look (e.g., validation):
func validateTasks(task PutTask) bool {
if task.taskId == 1 {
fmt.Println("Validation failed for task id: ", task.taskId)
return false
}
fmt.Println("Validation done for task id: ", task.taskId)
return true
}
Similarly, we implement the other stages (Audit, Update, Notify).
Step 3: Connect the Stages
Here’s how the pipeline comes together:
func convertToStream(done <-chan bool, arr []PutTask) <-chan PutTask {
stream := make(chan PutTask, len(arr))
go func() {
for _, task := range arr {
select {
case <-done:
return
case stream <- task:
}
}
close(stream)
}()
return stream
}
func main() {
// initilize server and handler
// start server
// handle request
// get tasks from request
startTime := time.Now()
listOfTasks := []PutTask{
{taskId: 1, toType: taskTypeEpic},
{taskId: 2, toType: taskTypeStory},
{taskId: 3, toType: taskTypeSubTask},
{taskId: 4, toType: taskTypeTechTask},
{taskId: 5, summary: "This is a updated task summary for task 5"},
{taskId: 6, summary: "The updated summary for task 6"},
}
done := make(chan bool)
defer close(done)
failedTaskDataStream := make(chan PutTask)
// Convert slice to channel
stream := convertToStream(done, listOfTasks)
// Stage 1: Validation
validatedTaskStream := pipelineGenerator(done, stream, failedTaskDataStream, validateTasks)
// Stage 2: Audit
auditTaskStream := pipelineGenerator(done, validatedTaskStream, failedTaskDataStream, createAudit)
// Stage 3: Database Update
updatedTaskStream := pipelineGenerator(done, auditTaskStream, failedTaskDataStream, updateTasks)
// Stage 4: Notify
notifiedTaskStream := pipelineGenerator(done, updatedTaskStream, failedTaskDataStream, NotifyForTasks)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// Consume the final output from the notifiedTaskStream and failedTaskDataStream
// to complete the pipeline and return the result to the caller
}()
wg.Wait()
fmt.Println("---- All work done ", time.Since(startTime), "------")
}
You can find the complete working code on my GitHub - https://github.com/architagr/The-Weekly-Golang-Journal/tree/main/pipeline/final_optimization
Why Concurrent Pipelines Are Lightning-Fast
Instead of processing tasks one stage at a time, each stage works simultaneously. This overlap dramatically reduces overall processing time. For example:
Tips for Building Robust Concurrent Pipelines
Wrapping Up
Concurrent Pipelines are a game-changer for high-performance systems. By leveraging Go’s powerful concurrency primitives, you can handle massive workloads with incredible speed and efficiency.
Want to see this code in action? https://github.com/architagr/The-Weekly-Golang-Journal/tree/main/pipeline/final_optimization
Keep Learning
Enjoyed this article? Consider subscribing to The Weekly Golang Journal for more practical insights and examples like this: Subscribe Here.
Let me know your thoughts or questions in the comments, or connect with me on LinkedIn. Let’s keep building better systems together!