Concurrency Patterns in Go | Part 2

Concurrency Patterns in Go | Part 2

In the part 1 we learned that channels are playing a crucial role in facilatating communication and sychronization between goroutines. While Go offers built-in channels that support complex synchronization patterns are required. In part 2, we will explore techniques that allow us to take this synchronization one step further.

The or-channel / The or-done-channel Pattern

To deal with more complex synchronization patterns, one such pattern is the Or-Channel, also known as the Or-Done-Channel. The Or-Channel is a powerful concept that allows you to combine multiple channels into a single channel, providing a way to wait for the first value or signal to be received from any of the input channels. It acts as a logical OR operator among the channels, ensuring that as soon as any channel sends a value or is closed, the Or-Channel receives that value or signal.

Use Case

Imagine a scenario where you want to perform multiple order requests simultaneously with pipeline pattern for each order (stage 1 (Validation Goroutine) , stage 2 (Enrichment Goroutine), and stage 3 (Calculation Goroutine)) and process accordinly all the responses that arrive. The Or-Channel allows you to achieve this behavior elegantly.

This pattern is particularly useful when you have these types multiple concurrent operations or goroutines running in parallel, and you need to wait for the first result or completion signal to second, and third respectively.

Let's try to explore by part one's example with simplified version.

Example

Assuming?Order?object resembles

type Order struct {
	ID     int
	Amount float64
}        

Stage 1 (Validation)

Each order is received and validated for correctness and completeness. Any invalid orders are filtered out.

func validateOrders(done <-chan struct{}, input <-chan Order, output chan<- Order){ 
   defer close(output) // Close the output channel when the function finishes

   for {
      select {
      case <-done:
         return
      case order, ok := <-input:
         if !ok {
            return
         }

         // Simulating validation
         time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

         // Validate order
         if order.Amount > 0 {
            output <- order
         }
      }
   }
}        

Stage 2 (Enrichment)

The valid orders are then enriched with additional information, such as customer details or product data, to enhance their content.

func enrichOrders(done <-chan struct{}, input <-chan Order, output chan<- Order) {
   defer close(output) // Close the output channel when the function finishes

   for {
      select {
      case <-done:
         return
      case order, ok := <-input:
         if !ok {
            return
         }

         // Simulating enrichment
         time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

         // Enrich order
         order.Amount *= 1.1
         output <- order
      }
   }
}        

Stage 3 (Calculation)

The enriched orders are processed further to perform calculations, such as total order value or shipping costs.

func calculateOrderValues(done <-chan struct{}, input <-chan Order, output chan<- Order) {
   defer close(output) // Signal the wait group that the processing is done for this goroutine

   for {
      select {
      case <-done:
         return
      case order, ok := <-input:
         if !ok {
            return
         }

         // Simulating calculation
         time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

         // Perform calculation
         order.Amount += 5.0
         output <- order
      }
   }
}        

Each stage is implemented as a separate goroutine, and channels are used to connect the stages. The orders flow through the pipeline, with each stage concurrently processing the orders it receives. This allows for parallelism, as multiple orders can be processed simultaneously by different stages.

func main() {
   rand.Seed(time.Now().UnixNano())

   // Create done channel to signal termination
   done := make(chan struct{})
   defer close(done)

   // Create channels to connect the stages
   orders := make(chan Order)
   validOrders := make(chan Order)
   enrichedOrders := make(chan Order)
   calculatedOrders := make(chan Order)

   // Start the stages concurrently
   go validateOrders(done, orders, validOrders)
   go enrichOrders(done, validOrders, enrichedOrders)
   go calculateOrderValues(done, enrichedOrders, calculatedOrders)

   // Generate sample orders
   go func() {
      for i := 1; i <= 10; i++ {
         order := Order{
            ID:     i,
            Amount: float64(i * 10),
         }
         orders <- order
      }
      close(orders)
   }()

   // Receive the final output from the pipeline
   for order := range calculatedOrders {
      fmt.Printf("Processed order ID: %d, Final amount: %.2f\n", order.ID, order.Amount)
   }
}        

In this code, the done channel is used to signal termination and gracefully shut down the concurrent stages of the pipeline.

Tee-Channel Pattern

The tee-channel pattern refers to a mechanism where a single channel is split into multiple channels to enable parallel processing of the data stream. It allows multiple consumers to receive the same data simultaneously.

Use Case

Imagine you have a system that receives a continuous stream of sensor data from multiple sensors distributed throughout a factory. You want to perform real-time analysis on this data and simultaneously store it for historical purposes. By utilizing a tee-channel, you can achieve this efficiently.

Here's how it would work:

  • To implement a tee-channel in Go, you can create a function that takes a source channel and multiple destination channels as arguments. Within this function, you can use a goroutine that loops over the source channel and sends the received values to all the destination channels.

func tee(source <-chan int, destinations ...chan<- int) {
   go func() {
      for value := range source {
         for _, dest := range destinations {
            dest <- value
         }
      }
      for _, dest := range destinations {
         close(dest)
      }
   }()
}        

  • The tee function takes a source channel source and a variadic argument destinations, which represents multiple destination channels. It launches a goroutine that continuously reads values from the source channel and sends them to each destination channel using a loop.Once the source channel is closed, the function closes all the destination channels to signal that no more values will be sent.

func main() {
   sensorData := make(chan int)
   realTimeAnalysis := make(chan int)
   storage := make(chan int)

   // Use tee function to split the source channel into two destination channels
   tee(sensorData, realTimeAnalysis, storage)

   // Send values to the source channel
   go func() {
      for i := 1; i <= 10; i++ {
         sensorData <- i
      }
      close(sensorData)
   }()

   // Consume values from the destination channels
   go func() {
      for value := range realTimeAnalysis {
         fmt.Println("Performing real-time analysis on data:", value)
      }
   }()
   go func() {
      for value := range storage {
         fmt.Println("Storing data:", value)
      }
   }()

   // Wait for goroutines to finish
   time.Sleep(time.Second)
}        

By using goroutines and channels, we achieve concurrent processing of the same data stream, allowing real-time analysis and storage to happen simultaneously.

Bridge-Channel Pattern

The bridge-channel pattern refers to connect or combine multiple channels into a single channel. It allows you to merge the streams of data from different channels into a unified stream, making it easier to consume and process the combined data.In this case, the bridge channel acts as a connector between multiple channels, enabling the flow of data between them.

Here's how it would work:

  • To implement a bridge channel in Go, you can create a function that takes multiple input channels and returns a single output channel.

func bridge(inputs ...<-chan int) <-chan int {
   output := make(chan int, len(inputs))

   go func() {
      defer close(output)

      var wg sync.WaitGroup
      wg.Add(len(inputs))

      for _, input := range inputs {
         go func(ch <-chan int) {
            defer wg.Done()
            for value := range ch {
               output <- value
            }
         }(input)
      }

      wg.Wait()
   }()

   return output
}        

  • The bridge function is called with sensorData1 and sensorData2 as arguments, which bridges these two input channels into a single output channel.

func main() {
   sensorData1 := make(chan int)
   sensorData2 := make(chan int)

   var wg sync.WaitGroup
   wg.Add(2)

   // Send values to input1 and input2 channels
   go func() {
      defer wg.Done()
      for i := 1; i <= 5; i++ {
         sensorData1 <- i
      }
      close(sensorData1)
   }()
   go func() {
      defer wg.Done()
      for i := 6; i <= 10; i++ {
         sensorData2 <- i
      }
      close(sensorData2)
   }()

   // Bridge the input channels into a single output channel
   output := bridge(sensorData1, sensorData2)

   // Consume values from the output channel
   go func() {
      defer wg.Done()
      for value := range output {
         fmt.Println("Received Full Sensor Data: ", value)
      }
   }()

   wg.Wait()
}        

The code sets up two channels to send sensor data, bridges these channels into a single output channel, and consumes the values from the output channel concurrently using goroutines. The use of channels and goroutines allows for concurrent communication and processing of sensor data.

Conclusion

In conclusion, the use of channels in Go programming plays a crucial role in facilitating communication and synchronization between goroutines. In complex scenarios where advanced synchronization patterns are required, techniques such as the Or-Channel, Tee-Channel, and Bridge-Channel patterns can be used to enhance synchronization and parallel processing capabilities.


I'm constantly delighted to receive feedback. Whether you spot an error, have a suggestion for improvement, or just want to share your thoughts, please don't hesitate to comment/reach out. I truly value connecting with readers!

要查看或添加评论,请登录

Miah Md Shahjahan的更多文章

社区洞察

其他会员也浏览了