Concurrency is one of those things that's easy to get wrong, even with Go concurrency features. Let's review things you should consider while writing a concurrency production code.
The guide is split into three parts, each with a different purpose. First, we'll talk about "Rules of Thumb," which are usually the right thing to do. The second part is on what to use for writing concurrent code. And finally, we'll cover how to write your custom concurrency primitives.
Before we start, I should mention that many of these recommendations will have conditions where they are not the best choice. The main situations are going to be performance and prototyping.
I've seen many times people using concurrency where you should not use it. It should go without saying, don't add concurrency unless you have a good reason.
var wg sync.WaitGroup
wg.Add(1)
go serve(&wg)
wg.Wait()
var wg sync.WaitGroup
wg.Add(1)
go serve(&wg)
wg.Wait()
❯
serve()
serve()
The concurrency here is entirely unnecessary, but I've seen this exact code in a repository. System without concurrency is much easier to debug, test and understand.
People also add concurrency because they think it will speed up their program. In a production environment, you are handling many concurrent requests anyways, so making one part concurrent doesn't necessarily make the whole system faster.
There are two excellent Go features that help you shake out concurrency bugs from your code.
First is -race, which enables the race detector to flag all the observed data races. It can be used with go test -race ./... or go build -race ./yourproject. See Data Race Detector for more details.
Second mark your tests with t.Parallel():
func TestServer(t *testing.T) {
t.Parallel()
// ...
func TestServer(t *testing.T) {
t.Parallel()
// ...
This makes your tests run in parallel, which can speed them up, but it also means you are more likely to find a hidden shared state that doesn't work correctly in concurrent code. In addition to finding bugs in our codebases, we've also found them in third-party libraries.
Avoid global variables such as caches, loggers, and databases.
For example, it's relatively common for people to use log.Println inside their service, and their testing output ends in the wrong location.
func TestAlpha(t *testing.T) {
t.Parallel()
log.Println("Alpha")
}
func TestBeta(t *testing.T) {
t.Parallel()
log.Println("Beta")
}
func TestAlpha(t *testing.T) {
t.Parallel()
log.Println("Alpha")
}
func TestBeta(t *testing.T) {
t.Parallel()
log.Println("Beta")
}
The output from go test -v will look like:
=== RUN TestAlpha
=== PAUSE TestAlpha
=== RUN TestBeta
=== PAUSE TestBeta
=== CONT TestAlpha
=== CONT TestBeta
2022/07/24 10:59:06 Alpha
--- PASS: TestAlpha (0.00s)
2022/07/24 10:59:06 Beta
--- PASS: TestBeta (0.00s)
PASS
ok test.test 0.213s
=== RUN TestAlpha
=== PAUSE TestAlpha
=== RUN TestBeta
=== PAUSE TestBeta
=== CONT TestAlpha
=== CONT TestBeta
2022/07/24 10:59:06 Alpha
--- PASS: TestAlpha (0.00s)
2022/07/24 10:59:06 Beta
--- PASS: TestBeta (0.00s)
PASS
ok test.test 0.213s
Notice how the "Alpha" and "Beta" are out of place. The code under test should call t.Log for any testing needs; then, the log lines will appear in the correct location. There's no way to make it work with a global logger.
Similarly, it's relatively common for people to start goroutines without waiting for them to finish. go keyword makes starting goroutines very easy; however, it's not apparent that you also must wait for them to stop.
go ListenHTTP(ctx)
go ListenGRPC(ctx)
go ListenDebugServer(ctx)
select{}
go ListenHTTP(ctx)
go ListenGRPC(ctx)
go ListenDebugServer(ctx)
select{}
❯
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return ListenHTTP(ctx)
}
g.Go(func() error {
return ListenGRPC(ctx)
}
g.Go(func() error {
return ListenDebugServer(ctx)
}
err := g.Wait()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return ListenHTTP(ctx)
}
g.Go(func() error {
return ListenGRPC(ctx)
}
g.Go(func() error {
return ListenDebugServer(ctx)
}
err := g.Wait()
When you don't know when things stop, you don't know when to close your connections, databases, or log files. For example, some stray goroutine might use a closed database and cause panic.
Similarly, when you wait for all goroutines to finish, you can detect scenarios when one of the goroutines has become indefinitely blocked.
The next common issue is not handling context cancellation. It usually won't be a problem in the production system itself. It's more of an annoyance during testing and development. Let's imagine you have a time.Sleep somewhere in your code:
time.Sleep(time.Minute)
time.Sleep(time.Minute)
❯
tick := time.NewTimer(time.Minute)
defer tick.Stop()
select {
case <-tick.C:
case <-ctx.Done():
return ctx.Err()
}
tick := time.NewTimer(time.Minute)
defer tick.Stop()
select {
case <-tick.C:
case <-ctx.Done():
return ctx.Err()
}
time.Sleep cannot react to any code, which means when you press Ctrl-C on your keyboard, it will stay on that line until it finishes. This can increase your test times due to some services shutting down slowly. Or, when doing upgrades on your servers, it can make them much slower to shut down.
The code for the waiting on the right is much longer, but we can write helpers to simplify it.
The other scenario where this cancellation comes up is long calculations:
for _, f := range files {
data, err := os.ReadFile(f)
// ...
}
for _, f := range files {
data, err := os.ReadFile(f)
// ...
}
❯
for _, f := range files {
if err := ctx.Err(); err != nil {
return err
}
data, err := os.ReadFile(f)
// ...
}
for _, f := range files {
if err := ctx.Err(); err != nil {
return err
}
data, err := os.ReadFile(f)
// ...
}
Here we can introduce a ctx.Err() call to check whether the context has been cancelled. Note ctx.Err() call is guaranteed to be concurrency safe, and it's not necessary to check ctx.Done() separately.
People coming from other languages often resort to creating worker pools. It's one of those tools that's necessary when you are working with threads instead of goroutines.
There are many reasons to not use worker pools:
They make stack traces harder to read. You'll end up having hundreds of goroutines that are on standby.
They use resources even if they are not working.
They can be slower than spawning a new goroutine.
You can replace your worker pools with a goroutine limiter -- something that disallows from creating more than N goroutines.
var wg sync.WaitGroup
defer wg.Wait()
queue := make(chan string, 8)
for k := 0; k < 8; k++ {
wg.Add(1)
go func() {
defer wg.Done()
for work := range queue {
process(work)
}
}()
}
for _, work := range items {
queue <- work
}
close(queue)
var wg sync.WaitGroup
defer wg.Wait()
queue := make(chan string, 8)
for k := 0; k < 8; k++ {
wg.Add(1)
go func() {
defer wg.Done()
for work := range queue {
process(work)
}
}()
}
for _, work := range items {
queue <- work
}
close(queue)
❯
var wg sync.WaitGroup
defer wg.Wait()
limiter := make(chan struct{}, 8)
for _, work := range items {
work := work
wg.Add(1)
limiter <- struct{}{}
go func() {
defer wg.Done()
defer func() { <-limiter }()
process(work)
}()
}
var wg sync.WaitGroup
defer wg.Wait()
limiter := make(chan struct{}, 8)
for _, work := range items {
work := work
wg.Add(1)
limiter <- struct{}{}
go func() {
defer wg.Done()
defer func() { <-limiter }()
process(work)
}()
}
We'll later show how to make a limiter primitive easier to use.
Polling another system is rather wasteful of resources. It's usually better to use some channel or signal to message the other side:
lastKnown := 0
for {
time.Sleep(time.Second)
t.mu.Lock()
if lastKnown != t.current {
process(t.current)
lastKnown = t.current
}
t.mu.Unlock()
}
lastKnown := 0
for {
time.Sleep(time.Second)
t.mu.Lock()
if lastKnown != t.current {
process(t.current)
lastKnown = t.current
}
t.mu.Unlock()
}
❯
lastKnown := 0
for newState := range t.updates {
if lastKnown != newState {
process(newState)
lastKnown = newState
}
}
lastKnown := 0
for newState := range t.updates {
if lastKnown != newState {
process(newState)
lastKnown = newState
}
}
Polling wastes resources when the update rates are slow. It also responds to changes slower compared to notifying directly. There are many ways to avoid polling, which could be a separate article altogether.
Of course, if you are making an external request and the external API is out of your control, you might not have any other choice than to poll.
It's easy to forget an mu.Unlock, wg.Wait or close(ch). If you always defer them, it will be much easier to see when they are missing.
for _, item := range items {
service.mu.Lock()
service.process(item)
service.mu.Unlock()
}
for _, item := range items {
service.mu.Lock()
service.process(item)
service.mu.Unlock()
}
❯
for _, item := range items {
func() {
service.mu.Lock()
defer service.mu.Unlock()
service.process(item)
}()
}
for _, item := range items {
func() {
service.mu.Lock()
defer service.mu.Unlock()
service.process(item)
}()
}
Even if your initial code is correct, then code modification can introduce a bug. For example, adding a return inside the loop after the mu.Lock() would leave the mutex locked.
When it comes to writing production code, it's a bad idea to use some concurrency primitives directly in your code. They can be error-prone and make code much harder to reason about.
When choosing primitives, prefer them in this order:
Notice there's no guarantee that the goroutine finishes before the database is closed. This can introduce weird test failure, where you try to insert into a closed database.
Similarly, another bug, r.Context() could be cancelled prematurely. Of course, this depends on the problem specifics, but when you start a background operation from your handler, you don't want the user to cancel it.
One of the solutions for starting goroutines is to use sync.WaitGroup. However, it also has quite a few problematic scenarios.
Let's take a look at the first common mistake with sync.WaitGroup:
func processConcurrently(item []*Item) {
var wg sync.WaitGroup
defer wg.Wait()
for _, item := range items {
item := item
go func() {
process(&wg, item)
}()
}
}
func process(wg *sync.WaitGroup, item *Item) {
wg.Add(1)
defer wg.Done()
...
}
func processConcurrently(item []*Item) {
var wg sync.WaitGroup
defer wg.Wait()
for _, item := range items {
item := item
go func() {
process(&wg, item)
}()
}
}
func process(wg *sync.WaitGroup, item *Item) {
wg.Add(1)
defer wg.Done()
...
}
Here the problem is that the processConcurrently can return before wg.Add is called. This means that we don't wait for all the goroutines to finish.
The other scenario comes up when people incrementally change code:
func processConcurrently(item []*Item) {
var wg sync.WaitGroup
wg.Add(len(items))
defer wg.Wait()
for _, item := range items {
item := item
if filepath.Ext(item.Path) != ".go" {
continue
}
go func() {
defer wg.Done()
process(item)
}()
}
}
func processConcurrently(item []*Item) {
var wg sync.WaitGroup
wg.Add(len(items))
defer wg.Wait()
for _, item := range items {
item := item
if filepath.Ext(item.Path) != ".go" {
continue
}
go func() {
defer wg.Done()
process(item)
}()
}
}
Notice how we moved the call to wg.Done outside of the process, making it easier to track the full concurrency. However, due to the extra if filepath.Ext statement, the code is wrong. That check was probably added by someone else at a later time. Similarly, it's one of those cases where tests might easily miss the problem.
To fully fix the code, it should look like this:
func processConcurrently(item []*Item) {
var wg sync.WaitGroup
defer wg.Wait()
for _, item := range items {
item := item
if filepath.Ext(item.Path) != ".go" {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
process(item)
}()
}
}
func processConcurrently(item []*Item) {
var wg sync.WaitGroup
defer wg.Wait()
for _, item := range items {
item := item
if filepath.Ext(item.Path) != ".go" {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
process(item)
}()
}
}
If you don't see the following parts when someone is using sync.WaitGroup, then it probably has a subtle error somewhere:
Instead of sync.WaitGroup there's a better alternative that avoids many of these issues:
func processConcurrently(item []*Item) error {
var g errgroup.Group
for _, item := range items {
item := item
if filepath.Ext(item.Path) != ".go" {
continue
}
g.Go(func() error {
return process(item)
})
}
return g.Wait()
}
func processConcurrently(item []*Item) error {
var g errgroup.Group
for _, item := range items {
item := item
if filepath.Ext(item.Path) != ".go" {
continue
}
g.Go(func() error {
return process(item)
})
}
return g.Wait()
}
errgroup.Group can be used in two ways:
// on failure, waits other goroutines
// to stop on their own
var g errgroup.Group
g.Go(func() error {
return publicServer.Run(ctx)
})
g.Go(func() error {
return grpcServer.Run(ctx)
})
err := g.Wait()
// on failure, waits other goroutines
// to stop on their own
var g errgroup.Group
g.Go(func() error {
return publicServer.Run(ctx)
})
g.Go(func() error {
return grpcServer.Run(ctx)
})
err := g.Wait()
// on failure, cancels other goroutines
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return publicServer.Run(ctx)
})
g.Go(func() error {
return grpcServer.Run(ctx)
})
err := g.Wait()
// on failure, cancels other goroutines
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return publicServer.Run(ctx)
})
g.Go(func() error {
return grpcServer.Run(ctx)
})
err := g.Wait()
You can read golang.org/x/sync/errgroup documentation for additional information. Note, errgroup allows to limit the number of goroutines that can be started concurrently.
Mutex is definitely a useful primitive, however you should be careful when you use it. I've seen quite often code that looks like:
func (cache *Cache) Add(ctx context.Context, key, value string) {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.evictOldItems()
cache.items[key] = entry{
expires: time.Now().Add(time.Second),
value: value,
}
}
func (cache *Cache) Add(ctx context.Context, key, value string) {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.evictOldItems()
cache.items[key] = entry{
expires: time.Now().Add(time.Second),
value: value,
}
}
You might wonder, what's the problem here. It's appropriately locking and unlocking. The main problem is the call to cache.evictOldItemsand that it's not handling context cancellation. This means that requests could end up blocking behind cache.mu.Lock, and even if they are cancelled you would need to wait for it to get unlocked before you can return.
Instead, you can use a chan *state, which allows you to handle context cancellation properly:
type Cache struct {
state chan *state
}
func NewCache() {
content := make(chan *state, 1)
content <- &state{}
return Cache{state: content}
}
func (cache *Cache) Add(ctx context.Context, key, value string) error {
select {
case <-ctx.Done():
return ctx.Err()
case state := <-cache.state:
defer func() { cache.state <- state }()
cache.evictOldItems()
cache.items[key] = entry{
expires: time.Now().Add(time.Second),
value: value,
}
return nil
}
}
type Cache struct {
state chan *state
}
func NewCache() {
content := make(chan *state, 1)
content <- &state{}
return Cache{state: content}
}
func (cache *Cache) Add(ctx context.Context, key, value string) error {
select {
case <-ctx.Done():
return ctx.Err()
case state := <-cache.state:
defer func() { cache.state <- state }()
cache.evictOldItems()
cache.items[key] = entry{
expires: time.Now().Add(time.Second),
value: value,
}
return nil
}
}
Even though the evictOldItems call is still there, it won't prevent other callers to Add to cancel their request.
Use sync.Mutex only for cases where you need to hold the lock for a short duration. Roughly it means that the code is O(N) or better, and N is small.
sync.RWMutex has all the same problems as sync.Mutex. However, it can also be significantly slower. Similarly, it makes it easy to have data races when you write to variables during RLock.
In your specific scenario, you should have benchmarks demonstrating that sync.RWMutex is faster than sync.Mutex.
Details: When there are a lot of readers and no writers, there's a cache contention between the readers because taking a read lock mutates a mutex, which is not scalable. A writer attempting to grab the lock blocks future readers from acquiring it, so long-lived readers with infrequent writers cause long delays of no work.
Either way, you should be able to demonstrate that your use of sync.RWMutex is helpful.
Channels are valuable things in the Go language but are also error-prone. There are many ways to write bugs with them:
const workerCount = 100
var wg sync.WaitGroup
workQueue := make(chan *Item)
defer wg.Wait()
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range workQueue {
process(item)
}
}()
}
err := db.IterateItems(ctx, func(item *Item) {
workQueue <- item
})
const workerCount = 100
var wg sync.WaitGroup
workQueue := make(chan *Item)
defer wg.Wait()
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range workQueue {
process(item)
}
}()
}
err := db.IterateItems(ctx, func(item *Item) {
workQueue <- item
})
This is probably one of the common ones... forgetting to close the channel. Channels also make the code harder to review compared to using higher-level primitives.
Using chan for communicating between different "goroutine processes" in your application is fine; however, ensure that you handle context cancellations and shut down properly. Otherwise, it's easy to introduce a deadlock.
I've come to the conclusion that you should avoid these in your domain logic:
make(chan X, N)
go func()
sync.WaitGroup
They are error-prone, and there are better approaches. It's clearer to write your own higher-level abstraction for your domain logic. Of course, having them isn't an "end-of-the-world" issue either.
I should separately note that using "select" is usually fine.
Here we need to ensure that we appropriately react to context cancellation so that we don't wait for a long time until we notice that context canceled the operation. Using this call is not much longer than time.Sleep itself:
This primitive is used the same way as errgroup.Group:
lim := NewLimiter(8)
defer lim.Wait()
for _, item := range items {
item := item
started := lim.Go(ctx, func() {
process(item)
})
if !started {
return ctx.Err()
}
}
lim := NewLimiter(8)
defer lim.Wait()
for _, item := range items {
item := item
started := lim.Go(ctx, func() {
process(item)
})
if !started {
return ctx.Err()
}
}
Of course, if your limited goroutines are dependent on each other, then it can introduce a deadlock.
Note that there's a potential "bug" with using such a Limiter. You must not call limiter.Go after you have called limiter.Wait, otherwise the goroutine can be started after limiter.Wait has returned. This can also happen with sync.WaitGroup and errgroup.Group. One way to avoid this problem is to disallow starting goroutines after limiter.Wait has been called. It probably makes sense to rename it to "limiter.Close" in that case.
Sometimes for testing, you need to start multiple goroutines and wait for all of them to complete. You can use errgroup for it; however, we can write a utility that makes it shorter:
func Concurrently(fns ...func() error) error {
var g errgroup.Group
for _, fn := range fns {
g.Go(fn)
}
return g.Wait()
}
func Concurrently(fns ...func() error) error {
var g errgroup.Group
for _, fn := range fns {
g.Go(fn)
}
return g.Wait()
}
A test can use it this way:
err := Concurrently(
func() error {
if v := cache.Get(123); v != nil {
return errors.New("expected value for 123")
}
return nil
},
func() error {
if v := cache.Get(256); v != nil {
return errors.New("expected value for 256")
}
return nil
},
)
if err != nil {
t.Fatal(err)
}
err := Concurrently(
func() error {
if v := cache.Get(123); v != nil {
return errors.New("expected value for 123")
}
return nil
},
func() error {
if v := cache.Get(256); v != nil {
return errors.New("expected value for 256")
}
return nil
},
)
if err != nil {
t.Fatal(err)
}
There are many variations of this. Should the function take ctx as an argument and pass it to the child goroutines? Should it cancel all the other functions via context cancellations when one error occurs?
Similarly, we quite often need to protect the state when concurrently modifying it. We've seen how sync.Mutex is sometimes error-prone and doesn't consider context cancellation. Let's write a helper for such a scenario.
type Locked[T any] struct {
state chan *T
}
func NewLocked[T any](initial *T) *Locked[T] {
s := &Locked[T]{}
s.state = make(chan *T, 1)
s.state <- initial
return s
}
func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error {
if ctx.Err() != nil {
return ctx.Err()
}
select {
case state := <-s.state:
defer func() { s.state <- state }()
return fn(state)
case <-ctx.Done():
return ctx.Err()
}
}
type Locked[T any] struct {
state chan *T
}
func NewLocked[T any](initial *T) *Locked[T] {
s := &Locked[T]{}
s.state = make(chan *T, 1)
s.state <- initial
return s
}
func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error {
There are many resources that can help you delve deeper.
You can find quite a lot of our own custom primitives at storj.io/common/sync2. This package contains most of our synchronization primitives. It contains things like Sleep and Concurrently, but also more advanced things like Cycle, ReadCache and Throttle. We also have problem specific implementations of Combiner and Queue that implement a combiner queue. This primitive allows to dial storage nodes, coalesce multiple deletion requests into a single request.
One of the best talks about Go concurrency is "Rethinking Classical Concurrency Patterns" by Bryan C. Mills. He discusses problems with worker pools and sync.Cond in-depth.
When you struggle with understanding data-races, then "Little Book of Semaphores" by Allen B. Downey is an excellent resource. It contains many classic problems and exercises to get your brain noticing them.
There has been also some research on the topic "Real-World Concurrency Bugs in Go" by Tengfei Tu et. al. It contains many additional issues not mentioned in this post.