Dealing with multiple workers - A Go layman approach

ANUPAM GOGOI
3 min readFeb 4, 2022

Introduction

So, how do you deal with when multiple workers (goroutines) are writing to a single channel? Pictorially, the problem can be described in the following diagram.

Here is the situation:

I have a go pipeline with two stages, stage 1 & stage 2.

Stage 1 has two workers (worker 1 & worker 2). Both workers are writing to an unbuffered channel.

Stage 2 has also two workers (worker 1 & worker 2). Both of them are polling or reading from the same channel.

Now, how Stage 2 will read data from the channel?

Some Approaches

Super naive approach

We can range over a channel only when it's closed.

So, in a naive way, we can close the channel in Stage 2 and read the contents from the channel like this.

close(ch)
for r:=range ch{

}

Let's think about what will happen with this approach. Well, it will be a disaster. Why? Because the workers in Stage 1 will no longer be able to send data to the channel and our pipeline will be useless just after the first run.

Naive approach

Maybe we can use the select option provided by Golang in an infinite loop as described below.

for {
select {
case data := <-ch:
fmt.Println(data)
}
}

Well, it will work. But, seems there is a caveat. Note that it's an infinite loop and there will be a memory leak. If your system is breaking, you will never be able to find the root cause when your code becomes complex. Take this for granted.

Let's do some advocacy on the approach that I am going to write in the next section.

A moderately naive approach

We all know the atomic variables in Go, right? So, what about keeping a track of the count of the finished workers? Below is the approach.

I have a fixed number of workers in each stage.

We keep a counter atomic variable and after each worker's completion, we just increment the variable in Stage 1.

Now, in the next stage i.e Stage 2, we do have an infinite for loop with select. But, with a little twist. Below is the twist.

for flag {
select {
case d := <-ch:
fmt.Printf("Received:%d\n", d)
default:
c := atomic.LoadUint64(n)
if c == NumOfWorkers {
flag = false
}
}
}

Note that I have declared a flag (boolean). Now, in the default case of the select statement, I am checking the count of finished workers. If it's equal to the number of workers we are simply exiting the infinite loop.

Below is the code. For simplicity, I just let only one worker in Stage 2 read data. You can try adding more workers.

Below is the GitHub gist,

https://gist.github.com/anupamgogoi0907/37e5c448d9f4c52407c32094fc338068

Conclusion

Probably, this is the simplest way to deal with multiple workers writing to a channel. There are elaborate discussions on this topic in [1] and [2]. Do have a look.

Happy Go learning!!!

[1] https://go101.org/article/channel-closing.html

[2]https://www.leolara.me/blog/closing_a_go_channel_written_by_several_goroutines/

--

--