Worker Pool to Limit Concurrent Goroutines
Recently, I had the need to process a near infinite number of messages asynchronously. You can’t just throw all the messages into separate goroutines when there are that many of them and more incoming. One solution is to use some sort of worker pool to manage the number of concurrent goroutines that can run. Here’s how I went about creating a worker pool (I’ve called it a task pool) in Go.
First, I defined a type for a function that returns an error:
type Task func() error
Then I created a struct to manage the work being done. I used a semaphore to limit the max number of tasks:
type TaskPool struct {
max int
sem *semaphore.Weighted
}
I added a method to create an instance of the struct with a max task limit:
func NewTaskPool(max int) *TaskPool {
if max <= 0 {
panic("max must be a value of >= 1")
}
return &TaskPool{
max: max,
sem: semaphore.NewWeighted(int64(max)),
}
}
The Run function takes in a task and blocks execution until the semaphore can be incremented. A context is used to provide a way to cancel the blocking call. Once the semaphore is acquired with a weight of 1 successfully, the task is executed from a goroutine. The semaphore is released and the error channel is closed when the goroutine finishes.
func (p *TaskPool) Run(ctx context.Context, task Task) <-chan error {
errc := make(chan error, 1)
err := p.sem.Acquire(ctx, 1)
if err != nil {
errc <- err
close(errc)
return errc
}
go func() {
defer p.sem.Release(1)
defer close(errc)
err = task()
if err != nil {
errc <- err
}
}()
return errc
}
I added a convenience method called Wait to block until all currently running tasks have finished executing. This is accomplished by acquiring all the available slots in the semaphore. After the semaphore has been fully acquired, no more tasks can be run. The semaphore is then completely released so it can be used again.
func (p *TaskPool) Wait() error {
// acquire all available slots in semaphore
for i := 0; i < p.max; i++ {
err := p.sem.Acquire(context.Background(), 1)
if err != nil {
return err
}
}
// all tasks have completed; release the semaphore
p.sem.Release(int64(p.max))
return nil
}
Hopefully this is helpful to someone else who wants to implement the same pattern. Full source can be found here. The async package has other methods for running tasks concurrently. I use them mostly for testing concurrent workflows, but they can provide value in production code as well.
Gopher artwork by Ashley McNamara