Pattern Overview
Why Concurrency Patterns Matters
The Problem: Goroutines + channels are primitives. Real systems need ergonomic, reusable patterns built from them.
The Solution: Worker pools, pipelines, fan-out/fan-in, rate limiters, and circuit breakers are the patterns that turn concurrency primitives into reliable services.
Real Impact: Recognising these patterns in problems means you can solve them with 30 lines instead of 300.
Real-World Analogy
Think of concurrency patterns as factory layouts:
- Worker pool = a fixed crew taking jobs from a single queue
- Pipeline = an assembly line — each station transforms and forwards
- Fan-out = splitting one stream into many parallel workers
- Fan-in = merging many streams back into one
- Rate limiter = the bouncer pacing how many jobs enter per second
Concurrency patterns are reusable solutions to common problems in concurrent programming. They help manage complexity, improve performance, and ensure correctness in multi-threaded environments.
Worker Pool Pattern
Manage a fixed number of goroutines to process work from a queue, controlling resource usage and preventing goroutine explosion.
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Job represents a unit of work
type Job struct {
ID int
Data interface{}
Handler func(context.Context, interface{}) error
}
// Result contains job execution results
type Result struct {
JobID int
Output interface{}
Error error
Duration time.Duration
}
// WorkerPool manages concurrent workers
type WorkerPool struct {
workers int
jobQueue chan Job
results chan Result
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(ctx context.Context, workers int, queueSize int) *WorkerPool {
poolCtx, cancel := context.WithCancel(ctx)
wp := &WorkerPool{
workers: workers,
jobQueue: make(chan Job, queueSize),
results: make(chan Result, queueSize),
ctx: poolCtx,
cancel: cancel,
}
wp.start()
return wp
}
func (wp *WorkerPool) start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case job, ok := <-wp.jobQueue:
if !ok {
return
}
start := time.Now()
output, err := wp.executeJob(job)
wp.results <- Result{
JobID: job.ID,
Output: output,
Error: err,
Duration: time.Since(start),
}
case <-wp.ctx.Done():
return
}
}
}
func (wp *WorkerPool) executeJob(job Job) (interface{}, error) {
// Create timeout context for job
jobCtx, cancel := context.WithTimeout(wp.ctx, 30*time.Second)
defer cancel()
// Execute with panic recovery
var result interface{}
var err error
done := make(chan bool)
go func() {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
done <- true
}()
err = job.Handler(jobCtx, job.Data)
}()
select {
case <-done:
return result, err
case <-jobCtx.Done():
return nil, jobCtx.Err()
}
}
// Submit adds a job to the queue
func (wp *WorkerPool) Submit(job Job) error {
select {
case wp.jobQueue <- job:
return nil
case <-wp.ctx.Done():
return wp.ctx.Err()
default:
return fmt.Errorf("job queue full")
}
}
// Shutdown gracefully stops the worker pool
func (wp *WorkerPool) Shutdown() {
close(wp.jobQueue)
wp.wg.Wait()
close(wp.results)
wp.cancel()
}
Pipeline Pattern
Sequential Processing
Each stage processes data and passes it to the next stage.
Concurrent Stages
Each stage runs in its own goroutine for parallel processing.
Error Handling
Propagate errors through dedicated error channel.
// Pipeline with error handling and cancellation
type Pipeline struct {
stages []Stage
ctx context.Context
cancel context.CancelFunc
}
type Stage func(context.Context, <-chan interface{}) <-chan interface{}
func NewPipeline(ctx context.Context) *Pipeline {
pipeCtx, cancel := context.WithCancel(ctx)
return &Pipeline{
ctx: pipeCtx,
cancel: cancel,
}
}
func (p *Pipeline) AddStage(stage Stage) {
p.stages = append(p.stages, stage)
}
func (p *Pipeline) Run(input <-chan interface{}) <-chan interface{} {
var output <-chan interface{} = input
for _, stage := range p.stages {
output = stage(p.ctx, output)
}
return output
}
// Example stages
func filterStage(predicate func(interface{}) bool) Stage {
return func(ctx context.Context, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for {
select {
case val, ok := <-in:
if !ok {
return
}
if predicate(val) {
select {
case out <- val:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return out
}
}
func transformStage(transform func(interface{}) interface{}) Stage {
return func(ctx context.Context, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for {
select {
case val, ok := <-in:
if !ok {
return
}
result := transform(val)
select {
case out <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
}
Rate Limiting Patterns
| Algorithm | Characteristics | Use Case |
|---|---|---|
| Token Bucket | Allows bursts, refills at constant rate | API rate limiting |
| Leaky Bucket | Smooth output rate, no bursts | Network traffic shaping |
| Sliding Window | Accurate rate calculation | Real-time monitoring |
| Fixed Window | Simple, resets at intervals | Billing periods |
// Token Bucket Rate Limiter
type TokenBucket struct {
tokens chan struct{}
maxTokens int
refillRate time.Duration
stop chan struct{}
}
func NewTokenBucket(maxTokens int, refillRate time.Duration) *TokenBucket {
tb := &TokenBucket{
tokens: make(chan struct{}, maxTokens),
maxTokens: maxTokens,
refillRate: refillRate,
stop: make(chan struct{}),
}
// Fill initial tokens
for i := 0; i < maxTokens; i++ {
tb.tokens <- struct{}{}
}
go tb.refill()
return tb
}
func (tb *TokenBucket) refill() {
ticker := time.NewTicker(tb.refillRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case tb.tokens <- struct{}{}:
default: // Bucket full
}
case <-tb.stop:
return
}
}
}
func (tb *TokenBucket) Allow() bool {
select {
case <-tb.tokens:
return true
default:
return false
}
}
func (tb *TokenBucket) Wait(ctx context.Context) error {
select {
case <-tb.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Best Practices
✅ Pattern Selection Guidelines
- ✓ Use Worker Pool for CPU-bound tasks with fixed concurrency
- ✓ Use Pipeline for sequential data transformations
- ✓ Use Fan-In/Out for parallel processing with aggregation
- ✓ Use Rate Limiting to protect downstream services
- ✓ Use Circuit Breaker for fault tolerance
- ✓ Always provide graceful shutdown mechanisms
❌ Common Mistakes
- ✗ Creating unlimited goroutines
- ✗ Not handling context cancellation
- ✗ Ignoring backpressure in pipelines
- ✗ Missing error propagation
- ✗ Not closing channels properly
- ✗ Deadlocks from improper channel usage
Practice Exercises
Exercise 1: Advanced Worker Pool
Extend the worker pool with priority queues, dynamic worker scaling, and metrics collection.
Exercise 2: Multi-Stage Pipeline
Build a data processing pipeline with error handling, retries, and progress tracking.
Exercise 3: Adaptive Rate Limiter
Implement a rate limiter that adjusts limits based on system load and response times.
Exercise 4: Circuit Breaker
Create a circuit breaker with half-open state, configurable thresholds, and event logging.
Challenge: Stream Processor
Design a real-time stream processing system combining multiple patterns: fan-out for distribution, pipelines for transformation, and rate limiting for flow control.