Lewislbr

Limiting concurrency in Go

Sometimes concurrency in tasks that require access to resources that can easily be saturated, like I/O or network operations, can lead to undesired results. There are some patterns to limit the number of active goroutines that can exist at the same time.

The maximum number of goroutines in this kind of operations is given by the machine's maximum number of file descriptors, which can be checked with ulimit -n.

Worker pool

A worker pool limits the number of active goroutines by creating a queue of jobs and distributing them among different workers, which will pick them one by one.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	const tasks = 10
	const workers = 2

	wg := sync.WaitGroup{}
	jobs := make(chan int, tasks)

	// Creating a worker pool
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		i := i
		go func() {
			for j := range jobs {
				n := generateNumber()
				fmt.Printf("Worker %d processing job %d with result %d.\n", i, j, n)
			}
			wg.Done()
		}()
	}

	// Sending tasks to the job queue
	for i := 0; i < tasks; i++ {
		jobs <- i
	}

	close(jobs)

	wg.Wait()
}

func generateNumber() int {
	time.Sleep(time.Second)

	return rand.Intn(10)
}

// Output:
// Worker 0 processing job 0 with result 1.
// Worker 1 processing job 1 with result 7.
// Worker 1 processing job 3 with result 7.
// Worker 0 processing job 2 with result 9.
// Worker 0 processing job 5 with result 1.
// Worker 1 processing job 4 with result 8.
// Worker 1 processing job 7 with result 5.
// Worker 0 processing job 6 with result 0.
// Worker 0 processing job 9 with result 6.
// Worker 1 processing job 8 with result 0.

Semaphore

A semaphore defines a limit on how many goroutines can be active by using a buffered channel to keep track of the goroutines that are running.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	const tasks = 10
	const limit = 2

	wg := sync.WaitGroup{}
	sem := make(chan bool, limit) // Creating a semaphore

	wg.Add(tasks)
	for i := 0; i < tasks; i++ {
		i := i
		sem <- true // Taking a spot of the semaphore
		go func() {
			n := generateNumber()
			fmt.Printf("Processing job %d with result %d.\n", i, n)
			<-sem // Releasing the spot
			wg.Done()
		}()
	}

	wg.Wait()
}

func generateNumber() int {
	time.Sleep(time.Second)

	return rand.Intn(10)
}

// Output:
// Processing job 1 with result 1.
// Processing job 0 with result 7.
// Processing job 3 with result 7.
// Processing job 2 with result 9.
// Processing job 5 with result 1.
// Processing job 4 with result 8.
// Processing job 6 with result 5.
// Processing job 7 with result 0.
// Processing job 8 with result 6.
// Processing job 9 with result 0.



If you're using dark mode, do you like the code blocks's theme? I have it available for VS Code, feel free to check it.