
Mastering Worker Pools in Go: From Basic to Advanced Concurrency Patterns
Concurrency is one of Go’s superpowers, thanks to its lightweight goroutines and channels. However, spawning unlimited goroutines can lead to resource exhaustion, especially in high-load scenarios like processing large datasets or handling API requests. This is where worker pools come in—a design pattern that limits the number of concurrent workers (goroutines) while efficiently distributing tasks.
Why Use Worker Pools?
- Resource Control: Cap a fixed number of goroutines to prevent overwhelming your system (e.g., CPU, memory, or external resources like database connections).
- Efficiency: Reuse goroutines for multiple tasks instead of creating new ones each time.
- Scalability: Ideal for task queues in web servers, data processing pipelines, or batch jobs.
A typical worker pool involves:
- A task channel to queue jobs.
- A fixed number of worker goroutines that pull tasks from the channel.
- Synchronization to wait for all tasks to complete.
Let’s start simple.
Basic Worker Pool: Using Channels Only (No WaitGroup)
In this version, we avoid sync.WaitGroup
entirely. Instead, we use a done channel to signal task completion. Each worker sends a signal on the done channel after processing a task. The main goroutine counts these signals to know when everything is done.
This approach is lightweight but requires manual counting, which can be error-prone for large task sets.
package main
import (
"fmt"
"time"
)
func worker(id int, tasks <-chan int, done chan<- struct{}) {
for task := range tasks {
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(time.Second) // Simulate work
done <- struct{}{} // Signal completion
}
}
func main() {
numWorkers := 3
numTasks := 10
tasks := make(chan int, numTasks)
done := make(chan struct{})
// Start workers
for i := 1; i <= numWorkers; i++ {
go worker(i, tasks, done)
}
// Enqueue tasks
for i := 1; i <= numTasks; i++ {
tasks <- i
}
close(tasks) // No more tasks
// Wait for all tasks to complete
for i := 0; i < numTasks; i++ {
<-done
}
fmt.Println("All tasks completed!")
}
How It Works:
- Tasks Channel: Buffered to hold all tasks upfront.
- Done Channel: Unbuffered; each task completion sends a signal.
- Closing the Tasks Channel: Tells workers to exit their loop.
- Waiting: Main loop receives exactly
numTasks
signals.
Pros: Simple, no extra sync primitives.
Cons: If tasks fail or workers crash, the count might deadlock. Not ideal for dynamic task addition.
Standard Worker Pool: With sync.WaitGroup
Now, let’s introduce sync.WaitGroup
—Go’s built-in way to wait for a group of goroutines. This is more robust than manual counting because WaitGroup
handles the synchronization atomically.
We add tasks to the WaitGroup before enqueuing them, and each worker calls Done()
after processing.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
for task := range tasks {
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(time.Second) // Simulate work
wg.Done() // Mark task as done
}
}
func main() {
numWorkers := 3
numTasks := 10
tasks := make(chan int, numTasks)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
go worker(i, tasks, &wg)
}
// Enqueue tasks and add to WaitGroup
for i := 1; i <= numTasks; i++ {
wg.Add(1) // Increment for each task
tasks <- i
}
close(tasks)
wg.Wait() // Block until all tasks are done
fmt.Println("All tasks completed!")
}
Key Improvements:
- WaitGroup.Add(1): Called for each task before sending it.
- Worker Calls wg.Done(): Decrements the counter.
- wg.Wait(): Blocks main until counter hits zero.
Pros: Thread-safe, easier to manage varying task counts.
Cons: Still no error handling or cancellation.
Note: We pass wg
by pointer to avoid copying the struct.
Intermediate Pattern: Dynamic Task Submission Without Closing Channel Early
In real-world apps, tasks might arrive dynamically (e.g., from a network). We can’t close the tasks channel upfront. Instead, use a separate quit channel to signal workers to stop.
This version combines WaitGroup with a quit signal.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, quit <-chan struct{}, wg *sync.WaitGroup) {
for {
select {
case task, ok := <-tasks:
if !ok {
return // Channel closed
}
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(time.Second)
wg.Done()
case <-quit:
return // Graceful shutdown
}
}
}
func main() {
numWorkers := 3
numTasks := 10
tasks := make(chan int)
quit := make(chan struct{})
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
go worker(i, tasks, quit, &wg)
}
// Submit tasks dynamically
for i := 1; i <= numTasks; i++ {
wg.Add(1)
tasks <- i
time.Sleep(500 * time.Millisecond) // Simulate dynamic arrival
}
// Signal quit and wait
close(quit)
wg.Wait()
fmt.Println("All tasks completed!")
}
Enhancements:
- Select Statement: Workers check for tasks or quit signals.
- Dynamic Submission: No need to buffer all tasks; submit as they come.
- Quit Channel: Broadcasts shutdown to all workers.
This handles streaming tasks better but still lacks timeout or cancellation.
Advanced Pattern: With Context for Cancellation and Error Handling
For production-grade pools, integrate context.Context
for timeouts/cancellation and a results channel for errors or outputs.
We’ll also use an error channel to collect issues without panicking.
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
type Result struct {
TaskID int
Err error
}
func worker(id int, ctx context.Context, tasks <-chan int, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done() // Decrement when worker exits
for {
select {
case <-ctx.Done():
return // Cancelled
case task, ok := <-tasks:
if !ok {
return
}
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(time.Second)
// Simulate potential error
var err error
if task%3 == 0 {
err = errors.New("task failed")
}
results <- Result{TaskID: task, Err: err}
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // Timeout after 5s
defer cancel()
numWorkers := 3
numTasks := 10
tasks := make(chan int, numTasks)
results := make(chan Result, numTasks)
var wg sync.WaitGroup
// Start workers
wg.Add(numWorkers) // Add for workers, not tasks
for i := 1; i <= numWorkers; i++ {
go worker(i, ctx, tasks, results, &wg)
}
// Enqueue tasks
for i := 1; i <= numTasks; i++ {
tasks <- i
}
close(tasks)
// Collect results in a goroutine
go func() {
wg.Wait() // Wait for workers to finish
close(results) // Safe to close results
}()
// Process results
for res := range results {
if res.Err != nil {
fmt.Printf("Error in task %d: %v\n", res.TaskID, res.Err)
} else {
fmt.Printf("Task %d completed successfully\n", res.TaskID)
}
}
fmt.Println("Worker pool shutdown complete!")
}
Advanced Features:
- Context.WithTimeout: Automatically cancels after a deadline.
- Results Channel: Collects outputs/errors; workers don’t call Done() per task—instead, wg tracks workers.
- Deferred wg.Done(): Called when workers exit, ensuring cleanup.
- Error Simulation: Shows how to propagate errors.
Pros: Handles cancellation, timeouts, and errors gracefully. Scalable for real apps.
Cons: More complex; requires careful channel management.
Best Practices and Trade-offs
- Buffering: Use buffered channels for tasks/results to avoid blocking.
- Error Propagation: Always consider panic recovery in workers (use
defer recover()
if needed). - Scaling: For very large pools, consider libraries like
golang.org/x/sync/errgroup
for simpler error handling. - Without WaitGroup? Stick to it for simplicity unless you’re in a constrained environment—manual channels work but are brittle.
- Performance: Benchmark with
go test -bench
; worker pools shine when tasks are I/O-bound.
Worker pools evolve with your needs: start basic for quick prototypes, add layers for robustness. Experiment with these patterns in your next Go project!
Happy coding! 🚀