Concurrency Patterns in Go | Part 2
In the part 1 we learned that channels are playing a crucial role in facilatating communication and sychronization between goroutines. While Go offers built-in channels that support complex synchronization patterns are required. In part 2, we will explore techniques that allow us to take this synchronization one step further.
The or-channel / The or-done-channel Pattern
To deal with more complex synchronization patterns, one such pattern is the Or-Channel, also known as the Or-Done-Channel. The Or-Channel is a powerful concept that allows you to combine multiple channels into a single channel, providing a way to wait for the first value or signal to be received from any of the input channels. It acts as a logical OR operator among the channels, ensuring that as soon as any channel sends a value or is closed, the Or-Channel receives that value or signal.
Use Case
Imagine a scenario where you want to perform multiple order requests simultaneously with pipeline pattern for each order (stage 1 (Validation Goroutine) , stage 2 (Enrichment Goroutine), and stage 3 (Calculation Goroutine)) and process accordinly all the responses that arrive. The Or-Channel allows you to achieve this behavior elegantly.
This pattern is particularly useful when you have these types multiple concurrent operations or goroutines running in parallel, and you need to wait for the first result or completion signal to second, and third respectively.
Let's try to explore by part one's example with simplified version.
Example
Assuming?Order?object resembles
type Order struct {
ID int
Amount float64
}
Stage 1 (Validation)
Each order is received and validated for correctness and completeness. Any invalid orders are filtered out.
func validateOrders(done <-chan struct{}, input <-chan Order, output chan<- Order){
defer close(output) // Close the output channel when the function finishes
for {
select {
case <-done:
return
case order, ok := <-input:
if !ok {
return
}
// Simulating validation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// Validate order
if order.Amount > 0 {
output <- order
}
}
}
}
Stage 2 (Enrichment)
The valid orders are then enriched with additional information, such as customer details or product data, to enhance their content.
func enrichOrders(done <-chan struct{}, input <-chan Order, output chan<- Order) {
defer close(output) // Close the output channel when the function finishes
for {
select {
case <-done:
return
case order, ok := <-input:
if !ok {
return
}
// Simulating enrichment
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// Enrich order
order.Amount *= 1.1
output <- order
}
}
}
Stage 3 (Calculation)
The enriched orders are processed further to perform calculations, such as total order value or shipping costs.
func calculateOrderValues(done <-chan struct{}, input <-chan Order, output chan<- Order) {
defer close(output) // Signal the wait group that the processing is done for this goroutine
for {
select {
case <-done:
return
case order, ok := <-input:
if !ok {
return
}
// Simulating calculation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// Perform calculation
order.Amount += 5.0
output <- order
}
}
}
Each stage is implemented as a separate goroutine, and channels are used to connect the stages. The orders flow through the pipeline, with each stage concurrently processing the orders it receives. This allows for parallelism, as multiple orders can be processed simultaneously by different stages.
func main() {
rand.Seed(time.Now().UnixNano())
// Create done channel to signal termination
done := make(chan struct{})
defer close(done)
// Create channels to connect the stages
orders := make(chan Order)
validOrders := make(chan Order)
enrichedOrders := make(chan Order)
calculatedOrders := make(chan Order)
// Start the stages concurrently
go validateOrders(done, orders, validOrders)
go enrichOrders(done, validOrders, enrichedOrders)
go calculateOrderValues(done, enrichedOrders, calculatedOrders)
// Generate sample orders
go func() {
for i := 1; i <= 10; i++ {
order := Order{
ID: i,
Amount: float64(i * 10),
}
orders <- order
}
close(orders)
}()
// Receive the final output from the pipeline
for order := range calculatedOrders {
fmt.Printf("Processed order ID: %d, Final amount: %.2f\n", order.ID, order.Amount)
}
}
In this code, the done channel is used to signal termination and gracefully shut down the concurrent stages of the pipeline.
领英推荐
Tee-Channel Pattern
The tee-channel pattern refers to a mechanism where a single channel is split into multiple channels to enable parallel processing of the data stream. It allows multiple consumers to receive the same data simultaneously.
Use Case
Imagine you have a system that receives a continuous stream of sensor data from multiple sensors distributed throughout a factory. You want to perform real-time analysis on this data and simultaneously store it for historical purposes. By utilizing a tee-channel, you can achieve this efficiently.
Here's how it would work:
func tee(source <-chan int, destinations ...chan<- int) {
go func() {
for value := range source {
for _, dest := range destinations {
dest <- value
}
}
for _, dest := range destinations {
close(dest)
}
}()
}
func main() {
sensorData := make(chan int)
realTimeAnalysis := make(chan int)
storage := make(chan int)
// Use tee function to split the source channel into two destination channels
tee(sensorData, realTimeAnalysis, storage)
// Send values to the source channel
go func() {
for i := 1; i <= 10; i++ {
sensorData <- i
}
close(sensorData)
}()
// Consume values from the destination channels
go func() {
for value := range realTimeAnalysis {
fmt.Println("Performing real-time analysis on data:", value)
}
}()
go func() {
for value := range storage {
fmt.Println("Storing data:", value)
}
}()
// Wait for goroutines to finish
time.Sleep(time.Second)
}
By using goroutines and channels, we achieve concurrent processing of the same data stream, allowing real-time analysis and storage to happen simultaneously.
Bridge-Channel Pattern
The bridge-channel pattern refers to connect or combine multiple channels into a single channel. It allows you to merge the streams of data from different channels into a unified stream, making it easier to consume and process the combined data.In this case, the bridge channel acts as a connector between multiple channels, enabling the flow of data between them.
Here's how it would work:
func bridge(inputs ...<-chan int) <-chan int {
output := make(chan int, len(inputs))
go func() {
defer close(output)
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, input := range inputs {
go func(ch <-chan int) {
defer wg.Done()
for value := range ch {
output <- value
}
}(input)
}
wg.Wait()
}()
return output
}
func main() {
sensorData1 := make(chan int)
sensorData2 := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
// Send values to input1 and input2 channels
go func() {
defer wg.Done()
for i := 1; i <= 5; i++ {
sensorData1 <- i
}
close(sensorData1)
}()
go func() {
defer wg.Done()
for i := 6; i <= 10; i++ {
sensorData2 <- i
}
close(sensorData2)
}()
// Bridge the input channels into a single output channel
output := bridge(sensorData1, sensorData2)
// Consume values from the output channel
go func() {
defer wg.Done()
for value := range output {
fmt.Println("Received Full Sensor Data: ", value)
}
}()
wg.Wait()
}
The code sets up two channels to send sensor data, bridges these channels into a single output channel, and consumes the values from the output channel concurrently using goroutines. The use of channels and goroutines allows for concurrent communication and processing of sensor data.
Conclusion
In conclusion, the use of channels in Go programming plays a crucial role in facilitating communication and synchronization between goroutines. In complex scenarios where advanced synchronization patterns are required, techniques such as the Or-Channel, Tee-Channel, and Bridge-Channel patterns can be used to enhance synchronization and parallel processing capabilities.
I'm constantly delighted to receive feedback. Whether you spot an error, have a suggestion for improvement, or just want to share your thoughts, please don't hesitate to comment/reach out. I truly value connecting with readers!