Concurrency Patterns in Go | Part 1

Concurrency Patterns in Go | Part 1

Go provides several powerful concurrency patterns that can be used to design efficient and scalable concurrent programs. In the first part, we will delve into three fundamental concurrency patterns, accompanied by real-world use cases.

Pipeline Pattern

The pipeline pattern in Go is a concurrency pattern used for efficiently processing large amounts of data. It involves dividing a complex task into smaller stages, each executed concurrently by separate goroutines.The output of one stage is passed as input to the next stage through channels, forming a pipeline.

Use Case

Let's say you have a system that needs to process a large dataset of customer orders. Each order goes through several stages of processing, such as validation, enrichment, and final calculation. Instead of sequentially processing each order, you can utilize the pipeline pattern to parallelize the processing and improve overall efficiency.

Here's how the pipeline pattern can be used in this scenario:

  1. Stage 1 (Validation): Each order is received and validated for correctness and completeness. Any invalid orders are filtered out.
  2. Stage 2 (Enrichment): The valid orders are then enriched with additional information, such as customer details or product data, to enhance their content.
  3. Stage 3 (Calculation): The enriched orders are processed further to perform calculations, such as total order value or shipping costs.

Example

Assuming Order object resembles

type Order struct {
	ID     int
	Amount float64
}        

Stage 1 (Validation)

Each order is received and validated for correctness and completeness. Any invalid orders are filtered out.

func validateOrders(input <-chan Order, output chan<- Order){
   defer close(output) // Close the output channel when the function finishes

   for order := range input {
      // Simulating validation
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

      // Validate order
      if order.Amount > 0 {
         output <- order
      }
   }
}        

Stage 2 (Enrichment)

The valid orders are then enriched with additional information, such as customer details or product data, to enhance their content.

func enrichOrders(input <-chan Order, output chan<- Order) {
   defer close(output) // Close the output channel when the function finishes

   for order := range input {
      // Simulating enrichment
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

      // Enrich order
      order.Amount *= 1.1
      output <- order
   }
}        

Stage 3 (Calculation)

The enriched orders are processed further to perform calculations, such as total order value or shipping costs.

func calculateOrderValues(input <-chan Order, output chan<- Order) {
   defer close(output) // Close the output channel when the function finishes
   
   for order := range input {
      // Simulating calculation
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

      // Perform calculation
      order.Amount += 5.0
      output <- order
   }
}
        

Each stage is implemented as a separate goroutine, and channels are used to connect the stages. The orders flow through the pipeline, with each stage concurrently processing the orders it receives. This allows for parallelism, as multiple orders can be processed simultaneously by different stages.

func main() {
	rand.Seed(time.Now().UnixNano())

	// Create channels to connect the stages
	orders := make(chan Order)
	validOrders := make(chan Order)
	enrichedOrders := make(chan Order)
	calculatedOrders := make(chan Order)

	// Start the stages concurrently
	go validateOrders(orders, validOrders)
	go enrichOrders(validOrders, enrichedOrders)
	go calculateOrderValues(enrichedOrders, calculatedOrders)

	// Generate sample orders
	go func() {
		for i := 1; i <= 10; i++ {
			order := Order{
				ID:     i,
				Amount: float64(i * 10),
			}
			orders <- order
		}
		close(orders)
	}()

	// Receive the final output from the pipeline
	for order := range calculatedOrders {
		fmt.Printf("Processed order ID: %d, Final amount: %.2f\n", order.ID, order.Amount)
	}
}        

By using the pipeline pattern, you can efficiently process a large number of orders in a scalable and concurrent manner. This can significantly reduce the overall processing time and improve the performance of your system.

Fan-Out, Fan-In Pattern

So you've got a pipeline set up.Data is flowing through your system beautifully, transforming as it makes its way through the stages you've chained together. It's like a beautiful orders processing; a beautiful, slow, and oh god why is this taking so long?

Sometimes, stages in your pipeline can be particularly computationally expensive. Imagine you have many attributes to validate and Stage 1 (Validation) is taking longer time then expected. In fact, it turns out it can, and you can solve it by another pattern has name: fan-out, fan-in.

Here's how the fan-out, fan-in pattern works:

  • Fan-Out: The input data is divided into smaller chunks, and each chunk is processed concurrently by a separate goroutine or stage. This allows for parallel processing of the workload.

No alt text provided for this image
Fig: Fan-Out

  • Fan-In: The results produced by the concurrent goroutines or stages are collected and merged into a single output channel or data structure. This consolidation combines the individual results into a final result.

No alt text provided for this image
Fig: Fan-In

Let's use fan-out, fan-in pattern for validateOrders goroutine

func validateOrders(input <-chan Order, output chan<- Order) {
   defer close(output) // Close the output channel when the function finishes

   // Create a channel to receive validated orders
   validatedOrders := make(chan Order)

   // Specify the number of worker goroutines to use for fan-out
   workerCount := 3

   // Fan-out: Start multiple worker goroutines
   var wg sync.WaitGroup
   for i := 0; i < workerCount; i++ {
      wg.Add(1)
      go func(input <-chan Order) {
         defer wg.Done()
         for order := range input {
            // Simulating validation
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

            // Validate order
            if order.Amount > 0 {
               validatedOrders <- order
            }
         }
      }(input)
   }

   // Fan-in: Collect the validated orders from the workers
   go func() {
      wg.Wait()
      close(validatedOrders)
   }()

   for order := range validatedOrders {
      output <- order
   }
}        

The fan-out, fan-in pattern is particularly useful when you have a large amount of data or a computationally intensive task that can be divided into independent subtasks. By distributing the work across multiple goroutines or stages like validateOrders goroutine, you can take advantage of parallel processing capabilities and improve overall performance.

Worker Pool Pattern

Bark! You're fully prepared and ready to enter production.

Wow! Your exceptional work pays off as your product becomes a sensation, going viral and generating a surge of orders that require rapid validation.

In the case of the validateOrders function, the worker pool pattern can be beneficial if as you have a potentially large number of orders to validate, and you want to control the number of goroutines executing the validation process.

The Worker Pool pattern is useful in scenarios where you have a large number of tasks to be processed concurrently, and you want to limit the number of active goroutines to avoid resource exhaustion.

No alt text provided for this image
Fig: Worker Pool

By introducing a worker pool, you can limit the number of active goroutines to a predefined count (e.g., workerCount). The worker goroutines in the pool can process the orders concurrently, with each worker picking up an order from the task queue as soon as it becomes available. This ensures that the number of active goroutines is controlled and resources are utilized efficiently.

func validateOrders(input <-chan Order, output chan<- Order) {
   defer close(output) // Close the output channel when the function finishes

   // Specify the number of worker goroutines to use in the worker pool
   workerCount := 3

   // Create a wait group to track the completion of the worker goroutines
   var wg sync.WaitGroup

   // Create a buffered channel to limit the number of tasks that can be submitted to the worker pool
   taskQueue := make(chan Order, workerCount)

   // Create a channel to collect the validated orders from the worker goroutines
   validatedOrders := make(chan Order)

   // Start worker goroutines
   for i := 0; i < workerCount; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         for order := range taskQueue {
            // Simulating validation
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

            // Validate order
            if order.Amount > 0 {
               validatedOrders <- order
            }
         }
      }()
   }

   // Enqueue tasks to the task queue (Fan-out)
   go func() {
      for order := range input {
         taskQueue <- order
      }
      close(taskQueue)
   }()

   // Collect validated orders from the worker goroutines (Fan-in)
   go func() {
      wg.Wait()              // Wait for all worker goroutines to finish
      close(validatedOrders) // Close the channel after all orders have been processed
   }()

   // Collect the validated orders from the channel and send them to the output channel
   for order := range validatedOrders {
      output <- order
   }
}        

Conclusion

In summary, the code snippet demonstrates the pipeline pattern by flowing the tasks from the input channel to the worker goroutines through the task queue. It incorporates the fan-out pattern by distributing tasks among multiple worker goroutines. The worker pool pattern is used by creating a fixed number of worker goroutines to process tasks concurrently.By combining these patterns, you can enables efficient and concurrent processing of orders, leveraging parallelism and task distribution among worker goroutines, resulting in improved throughput and scalability.

In part 2, we will explore techniques that allow us to take this synchronization one step further.


I'm constantly delighted to receive feedback. Whether you spot an error, have a suggestion for improvement, or just want to share your thoughts, please don't hesitate to comment/reach out. I truly value connecting with readers!

Qamar Uddin

Technical Leadership | Cloud Native | Azure | AWS

1 年

Excellent post. Would appreciate your view on - what if rather than building all orders and then calling the validate orders, we call the validate function from the order creation for loop in the main function itself.

要查看或添加评论,请登录

Miah Md Shahjahan的更多文章