Producer Consumer Problem with Go Lang


What is Producer Consumer Problem?

  • Producer is, that produces jobs.
  • Consumer is, that consumes those jobs.
  • Both of them (producer and consumer) share a buffer of a limited size.
  • The producer puts its jobs into the buffer.
  • Consumer picks the job buffer and processes it.


As per Wiki, both producer and consumer are cyclic processes. Each time the producer goes through its cyclic process, it produces a certain portion of the information that has to be processed by the consumer. when a consumer goes through its cyclic process, it can process the next portion of information produced by the producer.

The producer and consumer are connected by the buffer. Producer and consumer must be in sync. Consumer should not try to consumer information when buffer is empty and producer should not try to put information in buffer when it is full.

I will put up some common variations of the problem and it's implementation in GoLang.

To have some basic knowledge about channels and go-routines is a pre-requisite for the code snippets below. Go uses go-routines and channels to implement concurrency.


Code Snippet 1 - Single Consumer, Single Producer

package main

import "fmt"

var todos = []string{
	"Watching movie",
	"Play Sport",
	"Do Coding",
	"Clean house",
	"Eat Healthy",
}

func main() {
	ch := make(chan string)
	done := make(chan bool)

	go producer(ch)
	go consumer(ch, done)

	<-done
}

func producer(ch chan<- string) {
	for _, todo := range todos {
		ch <- todo
	}
	close(ch)
}

func consumer(ch <-chan string, done chan<- bool) {
	for todo := range ch {
		fmt.Println("Processing Todo - ", todo)
	}

	done <- true
}        

Output:-


  • In the following snippets, we have to wait groups. Wait groups allows us to wait for go routines to finish, otherwise main go routine will exit, which will result in terminating other go routines

Code Snippet 2 - 2 Consumers, 1 Producer

package main

import (
	"fmt"
	"sync"
)

var todos = []string{
	"Watching movie",
	"Play Sport",
	"Do Coding",
	"Clean house",
	"Eat Healthy",
	"Make Art",
	"Walk outside",
	"Drink Water",
	"Sleep",
	"Exercise",
}

func main() {
	ch := make(chan string)

	var wg sync.WaitGroup

	wg.Add(3)
	go producer(ch, &wg)
	go consumer2(ch, &wg)
	go consumer1(ch, &wg)
	wg.Wait()
}

func consumer1(ch <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()

	for val := range ch {
		fmt.Println("Consumer 1 Processing: ", val)
	}
}

func consumer2(ch <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()

	for val := range ch {
		fmt.Println("Consumer 2 Processing: ", val)
	}
}

func producer(ch chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()

	for _, todo := range todos {
		ch <- todo
	}
	close(ch)
}
        

Code Snippet 3 - Single Consumer, 2 Producers

package main

import (
	"fmt"
	"sync"
)

func main() {
	ch := make(chan int)

	var wg sync.WaitGroup
	var mu sync.Mutex

	wg.Add(2)
	go producer1(ch, &wg, &mu)
	go producer2(ch, &wg, &mu)

	wg.Add(1)
	go consumer(ch, &wg)

	wg.Wait()
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()

	for val := range ch {
		fmt.Println("Val = ", val)
	}
}

func producer1(ch chan<- int, wg *sync.WaitGroup, mu *sync.Mutex) {
	defer wg.Done()

	mu.Lock()
	for i := 1; i <= 10; i++ {
		ch <- i
	}
	mu.Unlock()
}

func producer2(ch chan<- int, wg *sync.WaitGroup, mu *sync.Mutex) {
	defer wg.Done()

	mu.Lock()
	for i := 11; i <= 20; i++ {
		ch <- i
	}
	mu.Unlock()

	close(ch)
}        


Output of above. Notice values are consumed in order


In the code snippet above, I have used mutex, because i wanted to maintain the order in which the values are consumed by consumer. If the order is not important, then mutex can be removed.

Code Snippet 4 - 2 Conumers, 2 Producers

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	ch := make(chan int)

	var wg sync.WaitGroup
	var mu sync.Mutex

	wg.Add(2)
	go producer1(ch, &wg, &mu)
	go producer2(ch, &wg, &mu)

	wg.Add(2)
	go consumer2(ch, &wg)
	go consumer1(ch, &wg)

	wg.Wait()
}

func consumer1(ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()

	for val := range ch {
		fmt.Println("Consumer 1 Val = ", val)
		time.Sleep(1 * time.Second)
	}
}

func consumer2(ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()

	for val := range ch {
		fmt.Println("Consumer 2 Val = ", val)
		time.Sleep(1 * time.Second)
	}
}

func producer1(ch chan<- int, wg *sync.WaitGroup, mu *sync.Mutex) {
	defer wg.Done()

	mu.Lock()
	for i := 1; i <= 10; i++ {
		ch <- i
	}
	mu.Unlock()
}

func producer2(ch chan<- int, wg *sync.WaitGroup, mu *sync.Mutex) {
	defer wg.Done()

	mu.Lock()
	for i := 11; i <= 20; i++ {
		ch <- i
	}
	mu.Unlock()

	close(ch)
}
        

Output:-


I hope this article is helpful for those who are still trying to increase understanding about go routines and channel in golang, specially this producer-consumer pattern,

要查看或添加评论,请登录

Prashant K. Sahni的更多文章

  • Kafka Architecture

    Kafka Architecture

    What is Kafka? Kafka is open source distributed event streaming framework, it streams data at real time. It collects…

    6 条评论
  • How seniors help juniors grow

    How seniors help juniors grow

    The role of senior developers/lead dev is significant in growing the junior developers. This applies to every…

    3 条评论

社区洞察

其他会员也浏览了