An overview of parallel programming (Go edition)

An overview of parallel programming (Go edition)

In practice, the software we write runs on several processors. Unfortunately, much of what we take for granted on a single processor becomes false when there are more than one processor. For example, if two processors modify the same piece of memory, what is the state of the memory after the modifications? It is difficult to tell in general. It is possible that the modification of one processor could overwrite any modification done by the other processor. The reverse could be true: the modification done by the other processor could win out. Or, maybe both processors will attempt the modification and the result will be a confused state that corresponds to nothing we would like to see. We call such accesses a ‘data race’: a situation where two or more processors in a program access the same memory location simultaneously, and at least one of those accesses is a write operation, without proper synchronization. It gets more difficult when you want two or more processors to meaningfully modify the same memory. For example, suppose that you have a variable that counts the number of products sold. Maybe you have different processors incrementing this one variable.

Threads and goroutines

A thread is the smallest unit of execution within a process that can be scheduled and run independently by a computer’s operating system. It represents a single sequence of instructions that the CPU executes, allowing a program to perform multiple tasks concurrently within the same process. A thread exists within a larger entity called a process, which is essentially a running program with its own memory space, resources, and state. A process can contain one or more threads, all sharing the same memory and resources (like open files or global variables) allocated to that process. There is a limit to how many threads a program can manage efficiently. To enable even more parallelism, the Go programming language has its own concept of a thread called a goroutine. While a goroutine is not a thread in the traditional sense, it maps to conventional threads under the hood. The Go runtime uses a scheduler to map many goroutines onto a smaller number of threads. These threads are the actual threads recognized by the operating system—kernel-level entities with their own stack and execution context, as described in general computing. A single thread in Go can execute multiple goroutines by switching between them efficiently. This makes goroutines much cheaper than OS threads—creating thousands or even millions of goroutines is practical, whereas spawning that many threads would exhaust system resources due to their larger memory footprint. In some sense, Go blurs the distinction between concurrency and parallelism. Concurrency is about managing multiple tasks so they can make progress independently. Parallelism, however, involves executing multiple tasks simultaneously across multiple resources. While concurrency focuses on software design for task coordination and can work with or without multiple cores, parallelism relies on hardware to achieve true simultaneous execution, and the two can combine when a concurrent system leverages parallel resources for efficiency. To start a goroutine, you only need to type the keyword ‘go’ followed by a function:

go func() {
    fmt.Println("Canada")
}()
        

This spawns a goroutine, but the Go runtime decides which thread it runs on, potentially sharing that thread with other goroutines. Unfortunately, a program made of only this goroutine could be disappointing:

package main

import (
    "fmt"
)

func main() {
    go func() {
        fmt.Println("Canada")
    }()
}
        

The problem is that the main function might end before the goroutine can terminate. In Go, goroutines run concurrently (at the same time), and the main function (which is the main goroutine) does not automatically wait for other goroutines to complete. If the main goroutine exits, the program terminates, potentially before other goroutines finish. To ensure a goroutine terminates before the program ends, the simplest approach is to synchronize the main goroutine with the spawned goroutine using a mechanism like a channel or a WaitGroup. In Go, a channel is a built-in construct that provides a way for goroutines (concurrent functions) to communicate with each other and synchronize their execution. A channel has a type and it is created with the make function:

ch := make(chan int) // A channel that carries integers
        

The keyword chan is the keyword for declaring a channel. The type after chan (e.g., int) defines what kind of data the channel can transport.

We use the <- operator to send a value into a channel.

ch <- 42 // Send the value 42 into the channel
        

We use the <- operator to receive a value from a channel.

value := <-ch // Receive a value from the channel and store it in 'value'
        

We use the close function to indicate no more data will be sent: close(ch). Sending to a closed channel causes a panic. The following program would print ‘Canada’:

package main

import "fmt"

func main() {
    ch := make(chan string) // Create a channel for strings

    go func() {
        ch <- "Canada" // Send a message to the channel
    }()

    msg := <-ch // Receive the message in the main goroutine
    fmt.Println(msg)
}
        

The following program illustrates how we might use a channel to wait for a goroutine to terminate:

package main

import (
    "fmt"
)

func main() {
    channel := make(chan bool) // Create a channel to signal completion

    go func() {
        fmt.Println("Canada")
        channel <- true // Signal that the goroutine is done
    }()

    <-channel // Wait for the goroutine to signal completion
}

        

The goroutine sends a value (true) to the channel when it finishes. The main function blocks at <-done, waiting to receive from the channel, ensuring it does not exit until the goroutine completes. By default a channel is unbuffered: it can contain at most one value. So if you try to write to it more than one value, it will block until at least one value is read.

ch := make(chan int, 2)
ch <- 1 // Does not block (buffer has space)
ch <- 2 // Does not block (buffer is now full)
ch <- 3 // Blocks until a value is received
        

In Go, you can pass multiple channels to a function just like any other arguments. Channels are first-class values in Go, meaning they can be passed as parameters, returned from functions, or stored in variables. When passing several channels to a function, you simply include them in the function’s parameter list, specifying their types. Let us consider an example where we access two URLs:

package main

import (
    "fmt"
    "net/http"
    "time"
)

// Response struct to hold URL and its fetch result
type Response struct {
    url    string
    status string
    err    error
}

func fetchURL(url string, ch chan Response) {
    // Create HTTP client with timeout
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // Make HTTP GET request
    resp, err := client.Get(url)
    if err != nil {
        ch <- Response{url, "", err}
        return
    }
    defer resp.Body.Close()

    ch <- Response{url, resp.Status, nil}
}

func main() {
    // Record start time
    startTime := time.Now()
    // Create channel for responses
    ch := make(chan Response)

    // URLs to fetch
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
    }

    // Start goroutines for each URL
    for _, url := range urls {
        go fetchURL(url, ch)
    }

    // Collect responses
    for i := 0; i < len(urls); i++ {
        resp := <-ch
        if resp.err != nil {
            fmt.Printf("Error fetching %s: %v\n", resp.url, resp.err)
        } else {
            fmt.Printf("Successfully fetched %s: %s\n", resp.url, resp.status)
        }
    }

    // Close the channel (optional since program ends here)
    close(ch)

    // Calculate and print elapsed time
    elapsed := time.Since(startTime)

    fmt.Printf("\nTotal time taken: %s\n", elapsed)
}

        

This program defines a Response struct to hold the URL, its status, and any error that occurred. It implements a fetchURL function that takes a URL and a channel as parameters, uses an HTTP client with a 10-second timeout, makes a GET request to the URL, sends the result through the channel. It uses defer to ensure the response body is closed. In this instance, the channel can be written to or read from in the function: to ensure that it can only be written to, we could declare it as ch chan<- Response instead as ch chan Response when passing it. In the main function, we create a channel to receive responses, we define two URLs to fetch, we launch a goroutine for each URL, we collect responses from the channel and we print the results. When we run this program, it will fetch both URLs simultaneously using separate goroutines, it will use channels to communicate results back to the main goroutine, and it will print the status (like “200 OK”) or any errors for each URL. We can rewrite this program so that it is simpler, without goroutines, like so:

package main

import (
    "fmt"
    "net/http"
    "time"
)

// Response struct to hold URL and its fetch result
type Response struct {
    url    string
    status string
    err    error
}

func fetchURLSynchro(url string) Response {
    // Create HTTP client with timeout
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // Make HTTP GET request
    resp, err := client.Get(url)
    if err != nil {
        return Response{url, "", err}
    }
    defer resp.Body.Close()

    return Response{url, resp.Status, nil}
}

func main() {
    // URLs to fetch
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
    }
    startTime := time.Now()

    for i := 0; i < len(urls); i++ {
        resp := fetchURLSynchro(urls[i])
        if resp.err != nil {
            fmt.Printf("Error fetching %s: %v\n", resp.url, resp.err)
        } else {
            fmt.Printf("Successfully fetched %s: %s\n", resp.url, resp.status)
        }
    }
    elapsed := time.Since(startTime)
    fmt.Printf("\nTotal time taken: %s\n", elapsed)
}
        

The two programs do the same work, but one uses two goroutines (in addition to the main goroutine) while the other uses only the main goroutine. Testing these programs, you may find that the one using two goroutines completes faster: network accesses are typically expensive and easily parallelizable. That is, the two tasks can be done almost independently on your computer, even if executed simultaneously. Hence, you may find that we can query two URLs using HTTP requests in 250 ms whereas 400 ms is needed if the requests are consecutive, using a single goroutine. However, you should not assume that using more goroutines always makes software run faster. It often does not. Furthermore, additional goroutines might trigger the use of additional processors which increases the cost or power usage of your software. Adding more goroutines makes your software more complicated, more difficult to maintain and debug. Formally speaking, you do not need parallelism (i.e., many physical processors) to execute two network requests concurrently. Executing such requests does not require much processing time and has much to do with waiting for the network response. Therefore, it is a case where using goroutines is likely appropriate. When splitting up more computational tasks into goroutines, you are less certain to get a performance boost. To illustrate the point, let us consider the case where we are summing all values in an array. We consider two cases, first a small array (100k elements) and then a large array with millions of elements. For both cases, we can either use a simple function (with one goroutine) or a function that uses multiple goroutines. To maximize parallelism, we set the number of goroutines to the number of processors detected on the system by Go (runtime.NumCPU()).

package main

import (
    "fmt"
    "runtime"
    "testing"
)

// sequentialSum calculates the sum of an array sequentially
func sequentialSum(numbers []int) int {
    sum := 0
    for _, n := range numbers {
        sum += n
    }
    return sum
}

// goroutineSumWithChannels calculates the sum using goroutines and channels
func goroutineSumWithChannels(numbers []int) int {
    numGoroutines := runtime.NumCPU() // Use number of CPU cores
    chunkSize := (len(numbers) + numGoroutines - 1) / numGoroutines
    resultChan := make(chan int, numGoroutines) // Buffered channel for partial sums
    activeGoroutines := 0
    // Split the array into chunks and process with goroutines
    for i := 0; i < numGoroutines; i++ {
        start := i * chunkSize
        end := start + chunkSize
        if end > len(numbers) {
            end = len(numbers)
        }
        if start >= end {
            break
        }

        go func(slice []int) {
            partialSum := 0
            for _, n := range slice {
                partialSum += n
            }
            resultChan <- partialSum
        }(numbers[start:end])
        activeGoroutines++
    }

    // Collect partial sums from the channel
    total := 0
    for i := 0; i < activeGoroutines; i++ {
        total += <-resultChan
    }
    close(resultChan)

    return total
}

// Benchmark functions
func BenchmarkSequentialSum(b *testing.B) {
    numbers := make([]int, 100000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        sequentialSum(numbers)
    }
}

func BenchmarkGoroutineSumWithChannels(b *testing.B) {
    numbers := make([]int, 100000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        goroutineSumWithChannels(numbers)
    }
}

// Benchmark functions
func BenchmarkSequentialSumLarge(b *testing.B) {
    numbers := make([]int, 10000000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        sequentialSum(numbers)
    }
}

func BenchmarkGoroutineSumWithChannelsLarge(b *testing.B) {
    numbers := make([]int, 10000000)
    for i := range numbers {
        numbers[i] = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        goroutineSumWithChannels(numbers)
    }
}

func main() {
    fmt.Printf("Number of CPU cores: %d\n", runtime.NumCPU())

    res := testing.Benchmark(BenchmarkGoroutineSumWithChannels)
    fmt.Println("BenchmarkGoroutineSumWithChannels", res)
    ress := testing.Benchmark(BenchmarkSequentialSum)
    fmt.Println("BenchmarkSequentialSum", ress)

    resl := testing.Benchmark(BenchmarkGoroutineSumWithChannelsLarge)
    fmt.Println("BenchmarkGoroutineSumWithChannelsLarge", resl)
    ressl := testing.Benchmark(BenchmarkSequentialSumLarge)
    fmt.Println("BenchmarkSequentialSumLarge", ressl)
}
        

On a system with a large number of processors, we might get the following result:

Number of CPU cores: 128
BenchmarkGoroutineSumWithChannels     4048      258798 ns/op
BenchmarkSequentialSum    23756      50516 ns/op
BenchmarkGoroutineSumWithChannelsLarge      744    1414114 ns/op
BenchmarkSequentialSumLarge      237       5030224 ns/op
        

We see that when summing up the modest array, we get that the approach using 128 goroutines takes five times longer. If it does end up using 128 processors, then it might be 128 * 5 = 640 times less efficient! The lesson is that if the task is sufficiently inexpensive, such as summing up thousands of integers, you should not use more than one goroutine. In the instance where we are summing 10 million integers, the parallelized task is more interesting: it goes 3.6 times faster. Again, the single-routine approach is likely much more efficient: a single processor takes 3.6 longer than over one hundred goroutine. The problem with a simple sum is that it is driven by memory accesses and not especially computational. What if we consider a more expensive task? Let us sum the sine of the values of an array using various numbers of goroutines (1, 2, …). We use one million values in the array.

package main

import (
    "fmt"
    "math"
    "runtime"
    "testing"
)

func computeSineSum(numbers []int) float64 {
    sum := 0.0
    for _, n := range numbers {
        sum += math.Sin(float64(n))
    }
    return sum
}

// computeSineSumWithGoroutines computes the sum of squares with a specified number of goroutines
func computeSineSumWithGoroutines(numbers []int, numGoroutines int) float64 {
    chunkSize := (len(numbers) + numGoroutines - 1) / numGoroutines
    resultChan := make(chan float64, numGoroutines)

    for i := 0; i < numGoroutines; i++ {
        start := i * chunkSize
        end := start + chunkSize
        if end > len(numbers) {
            end = len(numbers)
        }
        if start >= end {
            break
        }

        go func(slice []int) {
            partialSum := 0.0
            for _, n := range slice {
                partialSum += math.Sin(float64(n))
            }
            resultChan <- partialSum
        }(numbers[start:end])
    }

    // Collect results
    total := 0.0
    activeGoroutines := (len(numbers) + chunkSize - 1) / chunkSize
    for i := 0; i < activeGoroutines; i++ {
        total += <-resultChan
    }
    close(resultChan)
    return total
}

// Benchmarks
func BenchmarkSequential(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000 // Keep numbers manageable
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSum(numbers)
    }
}

func Benchmark1Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 1)
    }
}

func Benchmark2Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 2)
    }
}

func Benchmark4Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 4)
    }
}

func Benchmark8Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 8)
    }
}

func Benchmark16Goroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, 16)
    }
}

func BenchmarkMaxGoroutines(b *testing.B) {
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = i % 1000
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        computeSineSumWithGoroutines(numbers, runtime.NumCPU())
    }
}

func main() {

    fmt.Printf("CPU cores: %d\n", runtime.NumCPU())
    res1 := testing.Benchmark(BenchmarkSequential)
    fmt.Println("BenchmarkSequential", res1)
    res11 := testing.Benchmark(Benchmark1Goroutines)
    fmt.Println("Benchmark1Goroutines", res11)
    res2 := testing.Benchmark(Benchmark2Goroutines)
    fmt.Println("Benchmark2Goroutines", res2)
    res4 := testing.Benchmark(Benchmark4Goroutines)
    fmt.Println("Benchmark4Goroutines", res4)
    res8 := testing.Benchmark(Benchmark8Goroutines)
    fmt.Println("Benchmark8Goroutines", res8)
    res16 := testing.Benchmark(Benchmark16Goroutines)
    fmt.Println("Benchmark16Goroutines", res16)
    resmax := testing.Benchmark(BenchmarkMaxGoroutines)
    fmt.Println("BenchmarkMaxGoroutines", resmax)
}
        

On a powerful machine with many cores, we might get the following results:

CPU cores: 128

Benchmark1Goroutines 114 13701908 ns/op

Benchmark2Goroutines 134 8913817 ns/op

Benchmark4Goroutines 253 4648170 ns/op

Benchmark8Goroutines 472 2272842 ns/op

Benchmark16Goroutines 835 1227975 ns/op

BenchmarkMaxGoroutines 916 1189217 ns/op

Going from one goroutine to two improves the speed by a factor of 1.5. Going from one goroutine to 16 goroutines improves the speed by a factor of 11. Increasing the number of goroutines beyond 16 brings no further gain. This pattern is sublinear gains with an upper limit is rather typical.

Yet goroutines and channels can be remarkably efficient in their own right. Let us create a chain of channels. Each goroutine has an input channel and an output channel. As soon as data is received in the input channel, data is written to the input channel. We link hundreds of goroutines in a chain of input and output channels:

package main

import (
    "fmt"
    "time"
)

// relay function represents each goroutine in the chain
func relay(input <-chan int, output chan<- int) {
    // Wait for value from input channel
    value := <-input
    // Send value to output channel
    output <- value
}

func main() {
    // Number of goroutines in the chain
    const chainLength = 10000

    // Create slice to hold all channels
    channels := make([]chan int, chainLength+1)

    // Initialize all channels
    for i := range channels {
        channels[i] = make(chan int)
    }

    // Start timing
    startTime := time.Now()

    // Create the chain of goroutines
    for i := 0; i < chainLength; i++ {
        go relay(channels[i], channels[i+1])
    }

    // Send initial value into the first channel
    go func() {
        channels[0] <- 42
    }()

    // Wait for and receive the value from the last channel
    result := <-channels[chainLength]

    // Calculate elapsed time
    elapsed := time.Since(startTime)

    // Print results
    fmt.Printf("Value %d successfully passed through %d goroutines\n", result, chainLength)
    fmt.Printf("Time taken: %v\n", elapsed)
    fmt.Printf("Average time per hop: %v\n", elapsed/time.Duration(chainLength))
}

        

Running this program, you may get the following result:

Value 42 successfully passed through 10000 goroutines
Time taken: 13.987416ms
Average time per hop: 1.398μs
        

Effectively, you can traverse nearly a million goroutines per second in this manner.

Wait groups

Another common approach for managing multiple goroutines is to use sync.WaitGroup. Before we give an example, we need to review the keyword defer. In Go, the defer keyword is used to schedule a function call to be executed just before the surrounding function (the one containing the defer statement) returns. For example, the following function would print Canada followed by Mexico:

func() {
        defer fmt.Println("Mexico")
        fmt.Println("Canada")
    }
        

Let us consider a goroutine based on a defer a wait group:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1) // Increment the WaitGroup counter by 1

    go func() {
        defer wg.Done() // Decrement the counter when the goroutine completes
        fmt.Println("Canada")
    }()

    wg.Wait() // Wait for the counter to reach 0
}
        

For a single goroutine like in our example, the channel approach is simpler because it requires fewer lines and no additional imports. However, if you have multiple goroutines, the wait group might be simpler.

Atomics

If you need to read data from different goroutines, that is not a problem as long as the data remains constant. If nobody writes to the data, there is no problem. Unfortunately, we often need to change the data, while reading it from different goroutines. Sometimes you can use channels to communicate. But that is sometimes not enough. Let us consider an example. We take an array of 10 integers, and goroutines randomly decrement one array element and then increment another array element. Initially, the sum of all elements should be 1000 and it should remain 1000 unless there is a bug. We can implement our code like so:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {

    // Initialize array with 10 elements, each set to 100
    arr := [10]int{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}
    var wg sync.WaitGroup

    // Function for goroutine behavior
    worker := func() {
        defer wg.Done()
        wg.Add(1)
        r := rand.New(rand.NewSource(time.Now().UnixNano()))

        // Run for 200000000 iterations as an example
        for i := 0; i < 200000000; i++ {
            // Pick first random index
            idx1 := r.Intn(10)
            // Only proceed if value > 0
            if arr[idx1] > 0 {
                // Decrement first element
                arr[idx1]--

                // Pick second random index
                idx2 := r.Intn(10)
                // Increment second element
                arr[idx2]++

            }
        }
    }

    // Launch two goroutines
    go worker()
    go worker()
    fmt.Println("waiting...")
    wg.Wait()
    fmt.Println("waiting...ok")

    fmt.Println("\nFinal array state:", arr)
    // Verify total sum should still be 1000 (10 * 100)
    sum := 0
    for _, val := range arr {
        sum += val
    }
    fmt.Println("Total sum:", sum)
}

        

This program is wrong: it contains data races because we are writing and reading data from different goroutines without synchronization. A possible ouput of this program is the following:

Final array state: [3001 644 880 324 2319 2845 3664 160 232 1741]
Total sum: 15810
        

Observe how the sum is higher than expected.

In Go you can avoid such a bug with the guarantee of atomicity provided by the sync/atomic package, which ensures that operations like increments are executed as indivisible steps, preventing race conditions. Functions like atomic.AddInt32(&x, 1) or atomic.AddInt64(&x, 1) ensure that the increment operation (read-modify-write) is performed atomically. This means that even if two threads execute the increment concurrently, the operations are serialized at the hardware level, and no updates are lost.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

func main() {

    // Initialize array with 10 elements, each set to 100
    arr := [10]int32{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}
    var wg sync.WaitGroup

    // Function for goroutine behavior
    worker := func() {
        defer wg.Done()
        wg.Add(1)
        r := rand.New(rand.NewSource(time.Now().UnixNano()))

        // Run for 200000000 iterations as an example
        for i := 0; i < 200000000; i++ {
            // Pick first random index
            idx1 := r.Intn(10)
            // Only proceed if value > 0
            val := atomic.LoadInt32(&arr[idx1])
            if val > 0 {
                if atomic.CompareAndSwapInt32(&arr[idx1], val, val-1) {
                    // Pick second random index
                    idx2 := r.Intn(10)
                    // Increment second element
                    atomic.AddInt32(&arr[idx2], 1)
                }

            }
        }
    }

    // Launch two goroutines
    go worker()
    go worker()
    fmt.Println("waiting...")
    wg.Wait()
    fmt.Println("waiting...ok")

    fmt.Println("\nFinal array state:", arr)
    // Verify total sum should still be 1000 (10 * 100)
    sum := 0
    for _, val := range arr {
        sum += int(val)
    }
    fmt.Println("Total sum:", sum)
}

        

The expression atomic.LoadInt32(&arr[idx1]) atomically reads the value at array position idx1. The value is stored in a local variable (val): data races are not possible with a local variable. We then use a Compare-And-Swap (CAS) operation: atomic.CompareAndSwapInt32(&arr[idx1], val, val-1). It checks if arr[idx1] still equals val (the previously loaded value) and if true, it sets arr[idx1] to val-1. It returns true if successful, false if the value changed since the load. Importantly, it executes as a single atomic operation. Finally, we use atomic.AddInt32(&arr[idx2], 1) to atomically add 1 to arr[idx2].

If you run the new program, the sum of the values in the array is maintained. The program is safe.

Mutex

Atomic operations (like atomic.AddInt32 or atomic.CompareAndSwapInt32) are designed for single, indivisible operations on a single variable. They become insufficient when we have more complex data structures.

In these more complex cases, we use a mutex. A mutex (short for “mutual exclusion”) is a synchronization primitive used in concurrent programming to prevent multiple threads or processes from simultaneously accessing or modifying a shared resource. It ensures that only one thread (or goroutine) can enter a critical section of code at a time, thus avoiding race conditions and maintaining data consistency. Essentially, only one ‘lock’ can be held at any given time.

To illustrate, let us create a program where money is transferred between two accounts, and we need to ensure that the withdrawal from one account and deposit to another happen together without interference from other goroutines. This requires protecting a multi-step operation, which goes beyond what atomic operations can do.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Bank struct {
    accounts map[string]int // Map of account IDs to balances
    mutex    sync.Mutex    // Mutex to protect the entire transfer operation
}

func NewBank() *Bank {
    return &Bank{
        accounts: map[string]int{
            "Alice": 1000,
            "Bob":   500,
        },
    }
}

func (b *Bank) Transfer(from, to string, amount int, wg *sync.WaitGroup) {
    defer wg.Done()

    // Lock the mutex to protect the entire transfer operation
    b.mutex.Lock()
    defer b.mutex.Unlock() // Ensure unlock happens even if there's a panic

    // Check if source account has sufficient funds
    if b.accounts[from] >= amount {
        // Perform the transfer: two related operations
        b.accounts[from] -= amount
        b.accounts[to] += amount
        fmt.Printf("Transferred %d from %s to %s. New balances: %s=%d, %s=%d\n",
            amount, from, to, from, b.accounts[from], to, b.accounts[to])
    } else {
        fmt.Printf("Failed transfer of %d from %s to %s: insufficient funds\n",
            amount, from, to)
    }
}

func (b *Bank) GetBalance(account string) int {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    return b.accounts[account]
}

func main() {
    bank := NewBank()
    var wg sync.WaitGroup

    // Launch multiple concurrent transfers
    wg.Add(4)
    go bank.Transfer("Alice", "Bob", 200, &wg)
    go bank.Transfer("Bob", "Alice", 100, &wg)
    go bank.Transfer("Alice", "Bob", 300, &wg)
    go bank.Transfer("Bob", "Alice", 50, &wg)

    wg.Wait()

    fmt.Printf("Final balances: Alice=%d, Bob=%d\n",
        bank.GetBalance("Alice"), bank.GetBalance("Bob"))
}
        

In general, acquiring and releasing a mutex involves system-level operations, which introduces overhead even when there is no contention. This can slow down a program compared to lock-free alternatives like atomic operations. In complex cases, it is also possible to trigger a deadlock. A deadlock is a concurrency failure where threads are trapped in a circular wait for resources, unable to proceed due to mutual dependencies. We can modify our example to include a deadlock. Instead of a global mutex, we create a mutex per account. If the goroutine acquires the source account and then the destination account, a deadlock becomes possible.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Account struct {
    balance int
    mutex   sync.Mutex
}

type Bank struct {
    accounts map[string]*Account // Map of account IDs to account objects with individual mutexes
}

func NewBank() *Bank {
    return &Bank{
        accounts: map[string]*Account{
            "Alice": {balance: 1000},
            "Bob":   {balance: 500},
        },
    }
}

func (b *Bank) Transfer(from, to string, amount int, wg *sync.WaitGroup) {
    defer wg.Done()

    // Get the accounts
    fromAccount := b.accounts[from]
    toAccount := b.accounts[to]

    // Lock the "from" account first
    fromAccount.mutex.Lock()
    fmt.Printf("Locked %s for transfer of %d to %s\n", from, amount, to)

    // Simulate some work to increase chance of deadlock (optional, but helps demonstrate)
    time.Sleep(100 * time.Millisecond)

    // Then try to lock the "to" account
    toAccount.mutex.Lock()
    fmt.Printf("Locked %s for transfer of %d from %s\n", to, amount, from)

    // Perform the transfer
    if fromAccount.balance >= amount {
        fromAccount.balance -= amount
        toAccount.balance += amount
        fmt.Printf("Transferred %d from %s to %s. New balances: %s=%d, %s=%d\n",
            amount, from, to, from, fromAccount.balance, to, toAccount.balance)
    } else {
        fmt.Printf("Failed transfer of %d from %s to %s: insufficient funds\n",
            amount, from, to)
    }

    // Unlock both accounts
    toAccount.mutex.Unlock()
    fromAccount.mutex.Unlock()
}

func (b *Bank) GetBalance(account string) int {
    acc := b.accounts[account]
    acc.mutex.Lock()
    defer acc.mutex.Unlock()
    return acc.balance
}

func main() {
    bank := NewBank()
    var wg sync.WaitGroup

    // Launch two transfers in opposite directions to create deadlock
    wg.Add(2)
    go bank.Transfer("Alice", "Bob", 200, &wg) // Alice -> Bob
    go bank.Transfer("Bob", "Alice", 100, &wg) // Bob -> Alice

    wg.Wait() // This will never complete due to deadlock

    fmt.Printf("Final balances: Alice=%d, Bob=%d\n",
        bank.GetBalance("Alice"), bank.GetBalance("Bob"))
}
        

The deadlock in this code occurs because two goroutines acquire mutexes in different orders, leading to a circular wait. One strategy to avoid such a deadlock is to use ordered mutexes. E.g., if accounts are numbered, we always lock the account with the lesser number first.

Conclusion

Concurrency is a powerful tool in modern software development, enabling programs to leverage multiple processors for improved performance. However, it introduces significant complexities that must be carefully managed. Data races, where unsynchronized access to shared memory leads to unpredictable outcomes, underscore the need for robust synchronization mechanisms. Go’s goroutines and channels offer an elegant, lightweight approach to concurrency, allowing developers to efficiently parallelize tasks like network requests or data processing while avoiding the overhead of traditional threads. Yet, the performance benefits of parallelism are not guaranteed—simple tasks may suffer from excessive goroutine overhead, while computationally intensive operations can see substantial gains, albeit with diminishing returns as the number of goroutines increases.

Synchronization tools like sync.WaitGroup, atomic operations from sync/atomic, and mutexes (sync.Mutex) provide essential safeguards against concurrency pitfalls. Atomics excel for single-variable updates, ensuring thread safety with minimal overhead, while mutexes protect multi-step operations on complex data structures. However, mutexes come with risks, such as deadlocks, which arise from circular dependencies and require careful design—like consistent lock ordering—to avoid. Choosing the right concurrency strategy depends on the task’s nature, scale, and performance requirements. Ultimately, effective concurrent programming in Go demands a balance between leveraging parallelism for speed and maintaining simplicity, correctness, and efficiency in the face of shared resource contention.

要查看或添加评论,请登录

Daniel Lemire的更多文章

  • Multiplying by the inverse is not the same as the division

    Multiplying by the inverse is not the same as the division

    In school, we learn that the division is the same as multiplying the inverse (or reciprocal), that is x / y = x (1/y)…

    4 条评论
  • Speeding up C++ code with template lambdas

    Speeding up C++ code with template lambdas

    Let us consider a simple C++ function which divides all values in a range of integers: A division between two integers…

  • How fast can you open 1000 files?

    How fast can you open 1000 files?

    Jarred Sumner, the main author of the Bun JavaScript engine, commented a few days ago on X that opening many files on…

    1 条评论
  • AVX-512 gotcha: avoid compressing words to memory with AMD Zen 4 processors

    AVX-512 gotcha: avoid compressing words to memory with AMD Zen 4 processors

    Convention computer instructions operate on a single piece of data at once (e.g.

    4 条评论
  • Thread-safe memory copy

    Thread-safe memory copy

    A common operation in software is the copy of a block of memory. In C/C++, we often call the function memcpy for this…

    2 条评论
  • Programmer time and the pitfalls of wasteful work

    Programmer time and the pitfalls of wasteful work

    Programmer time is precious. This realization should shape our approach to software development, focusing our efforts…

  • Regular expressions can blow up!

    Regular expressions can blow up!

    Regular expressions, often abbreviated as regex, are a powerful tool for pattern matching within text. For example, the…

    6 条评论
  • Checking whether an ARM NEON register is zero

    Checking whether an ARM NEON register is zero

    Your phone probably runs on 64-bit ARM processors. These processors are ubiquitous: they power the Nintendo Switch…

  • JavaScript hashing speed comparison: MD5 versus SHA-256

    JavaScript hashing speed comparison: MD5 versus SHA-256

    Hashing algorithms convert input data into a fixed-size string of characters, known as a hash value or digest. These…

  • Counting the digits of 64-bit integers

    Counting the digits of 64-bit integers

    Given an integer in software, you may want to know how many decimal digits it needs. For example, the integer 100…

    3 条评论

社区洞察