Data handling in concurrent programs

April 2, 2020 Map Sources


Data handling in concurrent programs

In Go, we have goroutines functionality out of the box. We can run code in parallel. However, in our parallel running code we can work with shared variables, and it is not clear how exactly Go handles such situations.

In Go, we have goroutines functionality out of the box. We can run code in parallel. However, in our parallel running code we can work with shared variables, and it is not clear how exactly Go handles such situations.

Let’s start with the “counter task” — we’ll try to increment a counter variable 200 times in multiple goroutines.

c := 0
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		c++
		wg.Done()
	}()
}
wg.Wait()

fmt.Println(c)

// 194

The resulting counter value differs from time to time and in most cases is not equal to 200. So, this code is not thread-safe and doesn’t work as planned even if we don’t have any compiler or runtime errors.

Next case — we’ll try to insert 200 values into a slice in parallel and check if there are exactly 200 values.

c := []int{}
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		c = append(c, 1)
		wg.Done()
	}()
}
wg.Wait()

fmt.Println(len(c))

// 129

The number of values in the slice is even farther from 200 than it was in the counter task. This code is also not thread-safe.

Let’s try to insert 200 values into a map in parallel:

c := map[int]int{}
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		c[i] = i
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(len(c))

// panic: concurrent map writes

We can’t check the result because of the panic.

In all 3 tasks we have non-working code, but only with maps there is an error message about concurrent map writes, implemented by Go developers.

Race detection

Go has a tool to detect such situations called race detection.

One can run any test case above with the race flag — go test -race ./test.go. As a result, Go displays data race goroutines:

go test -race ./test.go

==================
WARNING: DATA RACE
Read at 0x00c0000a6070 by goroutine 9:
command-line-arguments.Test.func1()
/go/src/github.com/antelman107/go_blog/test.go:16 +0x38

Previous write at 0x00c0000a6070 by goroutine 8:
command-line-arguments.Test.func1()
/go/src/github.com/antelman107/go_blog/test.go:16 +0x4e

Goroutine 9 (running) created at:
command-line-arguments.Test()
/go/src/github.com/antelman107/go_blog/test.go:15 +0xe8
testing.tRunner()
/usr/local/Cellar/go/1.14/libexec/src/testing/testing.go:992 +0x1eb
--- FAIL: Test (0.01s)
testing.go:906: race detected during execution of test
FAIL
FAIL    command-line-arguments  0.025s
FAIL

Race detection is not a go test functionality. One can even build a program in race detection mode:

$ go test -race mypkg    // to test the package
$ go run -race .  // to run the source file
$ go build -race .   // to build the command
$ go install -race mypkg // to install the package

It is nice that one can directly detect data races in a program.

Even the popular “loop closure” issue can be detected:

wg := sync.WaitGroup{}
n := 10
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		fmt.Println(i)
		wg.Done()
	}()
}
wg.Wait()

The issue here is that the code won’t print exact 0, 1, 2 … 9 numbers, but random numbers between 0 and 9.

The solution

Let’s describe the solution for the counter task. This solution can be used for slice and map tasks.

So we have a counter value that is less than what we expect. Despite the brevity of the increment call (c++), the program actually performs the following list of actions:

  1. read current counter value from memory,
  2. increment it,
  3. save the result to memory.

The issue happens because some goroutines read the same initial value of the counter. After reading the same initial value, such goroutines change it in the same way. This behavior is explained in the diagram:

nonatomic.svg

The more we have this same initial value reading situation, the more the counter result differs from 200.

The solution here can be an atomic variable change. If some goroutine reads the counter’s initial value, the next action should be the only counter update from that goroutine. None of the other goroutines should access or change the counter in the middle of that operation.

If we add some synchronization logic as described above, the diagram will look as follows:

atomic.svg

  sequenceDiagram
    participant G1 as Goroutine 1
    participant C as Counter
    participant G2 as Goroutine 2
    
    G1->>C: Read value (1)
    G2->>C: Read value (1)
    G1->>G1: Calculate inc (1+1=2)
    G2->>G2: Calculate inc (1+1=2)
    G1->>C: Write value (2)
    G2->>C: Write value (2)
    Note over C: Final value is 2, not 3!

sync.Mutex/sync.RWMutex solution

We can use methods Lock and Unlock to guarantee that only one goroutine works with the counter at a time.

We can also use sync.RWMutex to provide parallel readings.

But in our task, Mutex is completely enough:

c := 0
n := 200
m := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		m.Lock()
		c++
		m.Unlock()
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(c)

// 200 == OK

Channels solution

Channel operations are atomic out of the box.

We can send any data into a channel with a single reader to provide sequential processing.

But to do that we need some additional code:

c := 0
n := 200

ch := make(chan struct{}, n)
chanWg := sync.WaitGroup{}
chanWg.Add(1)
go func() {
	for range ch {
		c++
	}
	chanWg.Done()
}()

wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		ch <- struct{}{}
		wg.Done()
	}(i)
}
wg.Wait()
close(ch)
chanWg.Wait()

fmt.Println(c)

// 200 = OK

We also used an empty struct here because it is the smallest sized variable type of data in Go.

Atomic package solution

The standard Go package named atomic provides a set of atomic operations.

Thanks to runtime_procPin / runtime_procUnpin functions (in the Go sources).

The Pin function guarantees that the Go scheduler won’t run any other goroutine until Unpin is called.

We have several counter functions in the atomic package that help to implement our atomic counter:

c := int32(0)
n := 200

wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		atomic.AddInt32(&c, 1)
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(c)

// 200 = OK

One can encounter the atomic data change issue in many development situations. For example, the same issue happens with SELECT + UPDATE queries in SQL databases on multiple processes.

Tags:

Related Articles

2 May 2020

sync.Map

sync.Map

Let’s take a look at sync.Map usage and its source code.

Read More → Sync.map Map Concurrency
9 Apr 2020

GO Templates: Principles and Usage

GO Templates: Principles and Usage

Packages text/template and html/template are part of the Go standard library. Go templates are used in many Go-programmed software — Docker, Kubernetes, Helm. Many third-party libraries are integrated with Go templates, for example Echo. Knowing Go template syntax is very useful.

This article consists of text/template package documentation and a couple of author’s solutions. After describing Go template syntax, we’ll dive into text/template and html/template sources.

Read More → Templates Html Text Sources
4 Apr 2020

Principles of slice type in GO

Principles of slice type in GO

The Go blog describes how to use slices. Let’s take a look at slice internals.

Read More → Slice Allocation Sources
2 Apr 2020

Principles of map type in GO

Principles of map type in GO

The map programming interface in Go is described in the Go blog. We just need to recall that a map is a key-value storage and it should retrieve values by key as fast as possible.

Read More → Map Sources
30 Mar 2020

Golang regexp: matching newline

Golang regexp: matching newline

Why regular expressions with dot (".") work differently in Go compared to PHP and JavaScript.

Read More → Regular Expressions Sources