Mastering Fan-Out, Fan-In in Golang: Supercharge Your Concurrency Skills

Mastering Fan-Out, Fan-In in Golang: Supercharge Your Concurrency Skills

Golang thrives on simplicity and efficiency, and one of its most elegant features is its built-in concurrency model. With goroutines and channels, Go makes it ridiculously easy to manage concurrent processes—without the headaches of traditional threading models.

But how do you take this power and scale it efficiently? Enter the Fan-Out, Fan-In pattern. Today, we’ll break it down with a real-world example: downloading files in parallel and adding them to a ZIP archive. We’ll compare a naive synchronous approach versus an optimized concurrent solution—showing you how to write performant and scalable Go applications.

Prerequisites

To make the most of this, you should be familiar with:

What is Fan-Out, Fan-In?

You might be thinking, Oh no, another fancy concurrency term! Don’t worry. By the end of this article, you'll realize this is just common sense wrapped in a cool name.

Fan-Out

Multiple worker goroutines process data from a single input channel concurrently, improving throughput.

Analogy: Your team leader gets tasks from the manager and distributes them among you and your colleagues. More hands, faster work!

Fan-In

Multiple goroutines send their results to a single output channel, aggregating data efficiently.

Analogy: Your scrum master collects everyone’s work updates and compiles them into a single report.

The Problem: Downloading and Zipping Files

Imagine we need to download 10 files. Each takes 2 seconds to download. Once downloaded, we must add each to a ZIP archive, which takes 1 second per file. If we do this synchronously, it would take:

(10 files 2s) + (10 files 1s) = 20s + 10s = ~30 seconds

Let’s look at the naive approach:

func DownloadFiles(done <-chan bool, fileUrlsStream <-chan string) <-chan []byte {
	fileStream := make(chan []byte, 10)

	go func() {
		defer close(fileStream)
		for url := range fileUrlsStream {
			log.Println("Downloading file from url: ", url)
			// download file
			time.Sleep(2 * time.Second)
			fileStream <- []byte("file content " + url)
		}
	}()
	return fileStream
}        

This function downloads files one at a time—not great. Now let’s Fan-Out this process.

Implementing Fan-Out for Downloads

Instead of downloading one file at a time, let’s distribute the work across multiple workers.

numberOfWorkers := 4
urlStream := fileUrlStreamGenerator(fileUrls)
downloadStreamArr := make([]<-chan []byte, numberOfWorkers)
// Fan-Out: Launching multiple workers
for i := 0; i < numberOfWorkers; i++ {
	downloadStreamArr[i] = filedownloader.DownloadFiles(done, urlStream)
}        

Now, we have 4 workers running in parallel. The new execution time? 10 files / 4 workers * 2s per file = 8s. Already a 60% improvement!

Implementing Fan-In: Processing the Downloaded Files

Once all files are downloaded, we need to aggregate them into a ZIP file. Instead of doing this sequentially, we can Fan-In multiple streams into one.

func Merge(done <-chan bool, contentStreams ...<-chan []byte) <-chan []byte {
	mergedStream := make(chan []byte)
	wg := sync.WaitGroup{}
	// This function takes single content stream and multiplexes it to mergedStream
	multiplex := func(done <-chan bool, contentStream <-chan []byte) {
		defer wg.Done()
		for content := range contentStream {
			// merge content
			select {
			case <-done:
				return
			case mergedStream <- content:
			}
		}
	}

	wg.Add(len(contentStreams))
	for _, contentStream := range contentStreams {
		// Fan-In
		// here we are starting multiple goroutines to multiplex content
                // from multiple content streams to mergedStream
		go multiplex(done, contentStream)
	}
        // Wait for all multiplexing goroutines to finish
	go func() {
		wg.Wait()
		close(mergedStream)
	}()
	return mergedStream
}        

Now, the ZIP operation takes just 4 seconds instead of 10. ??

What Stages Are Best Candidates for Using Fan-Out?

Not every stage in a pipeline benefits from parallel execution. Here are some key scenarios where Fan-Out makes a difference:

  • I/O Bound Tasks → Reading from disk, network requests, or database queries.
  • CPU Intensive Computation → Tasks like image processing, encryption, or complex calculations.
  • Independent Data Processing → When each task doesn’t rely on the output of others.

However, not all tasks are good candidates. If a stage requires sequential operations (e.g., writing to a single file), adding Fan-Out might reduce performance instead of improving it.

Additional Helper Functions

To make this work seamlessly, here are three key functions used in the pipeline:

ProcessContent

func ProcessContent(done <-chan bool, fileStream <-chan []byte) <-chan string {
	contentStream := make(chan string)

	go func() {
		defer close(contentStream)
		outFileName := "output.zip"
		// create a zip archive

		for fileContent := range fileStream {
			// add file to a zip archive
			time.Sleep(1 * time.Second)
			log.Printf("Adding file %s to zip archive\n", fileContent)
			// write file to zip
		}
		contentStream <- outFileName
	}()
	return contentStream
}        

fileUrlStreamGenerator

func fileUrlStreamGenerator(fileUrls []string) <-chan string {
	fileUrlStream := make(chan string, len(fileUrls))
	go func() {
		defer close(fileUrlStream)
		for _, fileUrl := range fileUrls {
			fileUrlStream <- fileUrl
		}
	}()
	return fileUrlStream
}        

startSynchronousProcessingStage

func startSynchronousProcessingStage(done <-chan bool, fileUrls []string) {
	log.Println("Starting synchronous processing stage")
	startTime := time.Now()
	urlStream := fileUrlStreamGenerator(fileUrls)

	downloadFileStream := filedownloader.DownloadFiles(done, urlStream)
	zipFileName := <-fileprocessor.ProcessContent(done, downloadFileStream)
	log.Println("Zip file created: ", zipFileName)
	log.Println("Time taken in synchronously processing stage: ", time.Since(startTime))
}        

The Final Result

  • Synchronous - The time taken was approximately 30 seconds.
  • Fan-Out (4 workers) - The time taken was approximately 12 seconds.

That’s almost a 60% reduction in time, simply by utilizing Go’s concurrency capabilities.

Where to Find the Code?

You can find the complete implementation on my GitHub repository: https://github.com/architagr/The-Weekly-Golang-Journal/tree/main/fan-out-fan-in

Why This Matters

This example highlights how Go’s lightweight concurrency model makes it incredibly simple to build scalable, efficient applications. With just a few lines of code, we:

  • Spun up multiple goroutines to execute tasks concurrently.
  • Used sync.WaitGroup to manage concurrent execution.
  • Used channels to collect results safely.
  • Drastically reduced execution time without adding unnecessary complexity.

Conclusion

If you’re already using Go, you know how beautiful and efficient its concurrency model is. If you’re not using Go yet, this is a perfect example of why you should start! The ability to easily scale workloads using Fan-Out, and Fan-In makes Go an ideal language for performance-intensive applications like web servers, data pipelines, and background processing systems.

Golang makes concurrency simple, elegant, and powerful—why not take advantage of it?

Stay Connected!

?? Follow me here on?LinkedIn?for more insights on software development and architecture: https://www.dhirubhai.net/in/architagarwal984/

?? Subscribe to my?YouTube?channel?for in-depth tutorials:?https://lnkd.in/gauaRed7

?? Sign up for my newsletter,?The?Weekly?Golang?Journal, for exclusive content:?https://lnkd.in/g8DzK7Ts

?? Follow me on?Medium?for detailed articles:?https://lnkd.in/gXSMeXxm

???? Join the discussion on my subreddit,?r/GolangJournal, and be part of the community!

Alex Lewtschuk

Blockchain Engineering | Software Development | Data Analytics | BSU CS Student

3 周

Always look forward to these posts, great read Archit!

回复

要查看或添加评论,请登录

Archit Agarwal的更多文章

社区洞察

其他会员也浏览了