Build Lightning-Fast APIs: Mastering Concurrent Pipelines in Go

Build Lightning-Fast APIs: Mastering Concurrent Pipelines in Go

Picture this:

You’re building a batch update API to process 1,000 tasks in a single request. Each task needs to go through these steps:

  1. Validate the input data.
  2. Log an audit entry.
  3. Update the database.
  4. Notify the user.

Sounds simple, right? Well, here’s the catch: each step takes 100 milliseconds per task. Multiply that by 1,000 tasks across 4 steps, and voilà—you’re staring at over 6 minutes to process a single batch. And let’s be real, nobody’s waiting that long without refreshing the page in frustration (or questioning your API design).

Now, for the good news: Golang’s concurrency model can rescue you from this latency abyss. With Concurrent Pipelines, you can cut down processing time to just ~400 milliseconds “ideally”, no matter how large the batch size. Let’s dive into the magic behind this.

What Are Concurrent Pipelines?

In a concurrent pipeline, tasks don’t wait for all of Stage 1 to finish before moving to Stage 2. Instead, tasks flow through the pipeline like water in a series of interconnected pipes:

  • A task finishes validation and immediately flows to the audit stage.
  • While that task is being audited, another task is already being validated.
  • And so on...

This overlap allows all stages to work simultaneously, maximizing your CPU’s resources while slashing overall processing time. It's like a factory assembly line where each worker does their job without waiting for the one before to finish everything.

Building a Concurrent Pipeline in Go

Here’s how you can implement a Concurrent Pipeline in Go using channels and goroutines.

Step 1: Define Your Tasks

First, we represent the update task request structure with PutTask:

type taskTypeEnum string

const (
	taskTypeEpic     taskTypeEnum = "Epic"
	taskTypeStory    taskTypeEnum = "Story"
	taskTypeSubTask  taskTypeEnum = "SubTask"
	taskTypeTechTask taskTypeEnum = "TechTask"
)

type PutTask struct {
	taskId  int
	toType  taskTypeEnum
	summary string
}        

Step 2: Create the Pipeline Stages

In this section, we’ll break down the pipeline generator and its supporting components. The goal is to process tasks concurrently while ensuring smooth and efficient stage transitions. Let’s dive into the code.

Each stage processes tasks concurrently. Here's a reusable function for setting up a pipeline stage:

type f func(task PutTask) bool

func pipelineGenerator(done <-chan bool, readDataStream <-chan PutTask, failedDataStream chan<- PutTask, stageTaskProcesser f) <-chan PutTask {

	stream := make(chan PutTask) // --- 1??
	go func() {
		pushWg := &sync.WaitGroup{}
		defer close(stream)
		for {
			select {
			case task, ok := <-readDataStream: // --- 2??
				if !ok {
					pushWg.Wait() // --- 3??
					return
				}
				pushWg.Add(1)
				go genericStageTaskProcessor(done, pushWg, task, stream, failedDataStream, stageTaskProcesser) // --- 4??
			case <-done:
				return
			}
		}
	}()
	return stream
}        

Breaking It Down

Parameters:

  1. done: A signal channel to shut down the pipeline gracefully, avoiding goroutine leaks.
  2. readDataStream: Input channel providing tasks for this stage of the pipeline.
  3. failedDataStream: Output channel for tasks that fail to process, enabling the caller to handle these failures.
  4. stageTaskProcesser: A function defining the logic to process each task at this stage.

What’s Happening Here?

1?? Creating the Stream: Each stage gets its stream to serve as input for the next stage in the pipeline.

2?? Reading Input Data: The readDataStream channel receives tasks from the previous stage. As long as data is flowing, the pipeline keeps processing.

3?? Waiting for Goroutines to Finish: When the input stream closes, we wait for all active Goroutines to complete processing before exiting.

4?? Concurrent Task Processing: For every incoming task, a new goroutine is spawned. This allows multiple tasks to be processed simultaneously, ensuring that slower tasks don’t block others.

The Helper Function: genericStageTaskProcessor

func genericStageTaskProcessor(done <-chan bool, wg *sync.WaitGroup, t PutTask, outputStream, failedDataStream chan<- PutTask, stageTaskProcesser f) {
	defer wg.Done()
	duration := time.Duration(100) * time.Millisecond
	if t.taskId%5 == 0 {
		duration = time.Duration(500) * time.Millisecond // --- 5??
	}
	// Simulating processing time
	ticker := time.After(duration)
	for {
		select {
		case <-done:
			return
		case <-ticker:
			// Processing the task
			if stageTaskProcesser(t) {
				outputStream <- t // --- 6??
				return
			}
			failedDataStream <- t
			return
		}
	}
}        

What’s Going On?

This function represents a worker who processes each task individually. Let’s break it down:

5?? Adding Processing Delays: To mimic real-world variability, some tasks (e.g., those where task % 5 == 0) are delayed longer than others. This helps simulate outliers in processing times.

6?? Pushing Results to Streams:

  • If a task is successfully processed (stageTaskProcesser(t) returns true), it’s sent to the outputStream for the next stage.
  • If it fails, it’s sent to the failedDataStream, allowing the caller to track or retry failed tasks.

Key Highlights of the Pipeline

1?? Efficient Streaming: Each stage processes its input stream concurrently, without holding up subsequent tasks. If a task takes longer to process, the pipeline doesn’t block other tasks from advancing to the next stage.

2?? Error Handling: By separating failed tasks into the failedDataStream, we can handle them without affecting the flow of successful tasks.

3?? Concurrency Done Right: The use of sync.WaitGroup ensures all goroutines complete their work before shutting down.

Additionally, the done channel enables a clean and controlled shutdown of the pipeline.

Here’s how a single stage might look (e.g., validation):

func validateTasks(task PutTask) bool {
	if task.taskId == 1 {
		fmt.Println("Validation failed for task id: ", task.taskId)
		return false
	}
	fmt.Println("Validation done for task id: ", task.taskId)
	return true
}        

Similarly, we implement the other stages (Audit, Update, Notify).

Step 3: Connect the Stages

Here’s how the pipeline comes together:

func convertToStream(done <-chan bool, arr []PutTask) <-chan PutTask {
	stream := make(chan PutTask, len(arr))
	go func() {
		for _, task := range arr {
			select {
			case <-done:
				return
			case stream <- task:
			}
		}
		close(stream)
	}()
	return stream
}

func main() {
	// initilize server and handler
	// start server
	// handle request
	// get tasks from request
	startTime := time.Now()
	listOfTasks := []PutTask{
		{taskId: 1, toType: taskTypeEpic},
		{taskId: 2, toType: taskTypeStory},
		{taskId: 3, toType: taskTypeSubTask},
		{taskId: 4, toType: taskTypeTechTask},
		{taskId: 5, summary: "This is a updated task summary for task 5"},
		{taskId: 6, summary: "The updated summary for task 6"},
	}
	done := make(chan bool)
	defer close(done)
	failedTaskDataStream := make(chan PutTask)

	// Convert slice to channel
	stream := convertToStream(done, listOfTasks)

	// Stage 1: Validation
	validatedTaskStream := pipelineGenerator(done, stream, failedTaskDataStream, validateTasks)

	// Stage 2: Audit
	auditTaskStream := pipelineGenerator(done, validatedTaskStream, failedTaskDataStream, createAudit)

	// Stage 3: Database Update
	updatedTaskStream := pipelineGenerator(done, auditTaskStream, failedTaskDataStream, updateTasks)

	// Stage 4: Notify
	notifiedTaskStream := pipelineGenerator(done, updatedTaskStream, failedTaskDataStream, NotifyForTasks)

	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		// Consume the final output from the notifiedTaskStream and failedTaskDataStream
		// to complete the pipeline and return the result to the caller
		
	}()
	wg.Wait()

	fmt.Println("---- All work done ", time.Since(startTime), "------")
}
        

You can find the complete working code on my GitHub - https://github.com/architagr/The-Weekly-Golang-Journal/tree/main/pipeline/final_optimization

Why Concurrent Pipelines Are Lightning-Fast

Instead of processing tasks one stage at a time, each stage works simultaneously. This overlap dramatically reduces overall processing time. For example:

  • If each stage takes 100 ms, a sequential pipeline would take 4 × 100 ms = 400 ms per task, or 6 minutes for 1,000 tasks.
  • A Concurrent Pipeline processes tasks in parallel, so the slowest stage (100 ms) dictates the total time for all tasks—just ~400 ms for the entire batch.

Tips for Building Robust Concurrent Pipelines

  1. Buffer Channels: If a downstream stage processes tasks slower than an upstream stage, buffer channels can prevent blocking.
  2. Error Handling: Ensure that errors in one task don’t disrupt the pipeline. You can route failed tasks to a separate channel for retries or logging.
  3. Resource Management: Limit the number of goroutines to prevent resource exhaustion. Use worker pools or semaphore patterns if needed.
  4. Profiling: Regularly profile your pipeline with tools like pprof to identify bottlenecks and optimize performance.

Wrapping Up

Concurrent Pipelines are a game-changer for high-performance systems. By leveraging Go’s powerful concurrency primitives, you can handle massive workloads with incredible speed and efficiency.

Want to see this code in action? https://github.com/architagr/The-Weekly-Golang-Journal/tree/main/pipeline/final_optimization

Keep Learning

Enjoyed this article? Consider subscribing to The Weekly Golang Journal for more practical insights and examples like this: Subscribe Here.

Let me know your thoughts or questions in the comments, or connect with me on LinkedIn. Let’s keep building better systems together!

要查看或添加评论,请登录

Archit Agarwal的更多文章

社区洞察

其他会员也浏览了