Summary of reading:
Concurrency in Go - Katherine Cox-Buday
I agree with Eli Bendersky's point of view about this book i.e
if coming from the arena of C++, which is that, the book is relatively
can be considered as a 'reference book' for some golang tricks.
Before delve into golang concurrency, the understanding of
golang scheduler is a must.
Reference:
http://vsdmars.blogspot.com/2018/06/golang-go-scheduler.html
http://vsdmars.blogspot.com/2018/07/golangnote-analysis-of-go-runtime.html
HPX lib (C++)
Intel TBB (C++)
Understanding memory model is also essential to concurrency, although in golang, this is mostly hidden by it's run-time and channel.
Reference:
http://vsdmars.blogspot.com/2018/09/concurrency-c-wrap-up-2018.html
For advanced concurrency programming, take a look at
The Art of Multiprocessor Programming - Maurice Herlihy & Nir Shavit
Reference:
http://vsdmars.blogspot.com/2016/01/multiprocessor-programming-types-of.html
Jotting down reading notes for those are fun/useful :-)
RWLock:
[C++]
std::shared_mutex
[Golang]
sync.RWMutex
[Python]
None, and i coined one for fun:
https://github.com/verbalsaintmars/python_util/tree/master/rwlock
Cond Variable:
[C++]
std::condition_variable
[Golang]
sync.NewCond
sync.Once dead lock example:
Concurrency in Go - Katherine Cox-Buday
I agree with Eli Bendersky's point of view about this book i.e
if coming from the arena of C++, which is that, the book is relatively
can be considered as a 'reference book' for some golang tricks.
Before delve into golang concurrency, the understanding of
golang scheduler is a must.
Reference:
http://vsdmars.blogspot.com/2018/06/golang-go-scheduler.html
http://vsdmars.blogspot.com/2018/07/golangnote-analysis-of-go-runtime.html
HPX lib (C++)
Intel TBB (C++)
Understanding memory model is also essential to concurrency, although in golang, this is mostly hidden by it's run-time and channel.
Reference:
http://vsdmars.blogspot.com/2018/09/concurrency-c-wrap-up-2018.html
For advanced concurrency programming, take a look at
The Art of Multiprocessor Programming - Maurice Herlihy & Nir Shavit
Reference:
http://vsdmars.blogspot.com/2016/01/multiprocessor-programming-types-of.html
Jotting down reading notes for those are fun/useful :-)
RWLock:
[C++]
std::shared_mutex
[Golang]
sync.RWMutex
[Python]
None, and i coined one for fun:
https://github.com/verbalsaintmars/python_util/tree/master/rwlock
Cond Variable:
[C++]
std::condition_variable
[Golang]
sync.NewCond
sync.Once dead lock example:
var onceA, onceB sync.Once
var initB func()
initA := func() { onceB.Do(initB) }
initB = func() { onceA.Do(initA) }
onceA.Do(initA)
sync.Pool
sync.Pool can be the panacea for creating new type instance
inside a 'loop', i.e for loop.
Experienced engineer could spot the performance hit 'bad design'
by seeing new/make_shared/NewXXX etc. inside a for/while loop.
Reusing the type instance from being GCed boosts the performance;
however, just that, the pool stored type instance should be stateless
to avoid side-effect.
GC can still drain the Pool, thus a .Get() with drained Pool will create a new type's instance.
Further intriguing read:
Purpose of sync.Pool
Learn High Performance Go
Close(Channel):
close(channel) acts as a signal for those blocking channels to continue.
e.g
https://play.golang.org/p/k3143TbiqO4
package main
import (
"fmt"
"time"
)
var ch = make(chan struct{})
func run(cnt int) {
<-ch
fmt.Println(cnt)
}
func main() {
go run(1)
go run(2)
go run(3)
go run(4)
time.Sleep(10 * time.Second)
close(ch)
time.Sleep(10 * time.Second)
}
Select as normalized distribution seed:
Using select to act as a normalized distribution seed.
e.g:
https://play.golang.org/p/2m4VB9rhfcU
package main
import "fmt"
func main() {
c1 := make(chan interface{})
close(c1)
c2 := make(chan interface{})
close(c2)
var c1Count, c2Count int
for i := 1000; i >= 0; i-- {
select {
case <-c1:
c1Count++
case <-c2:
c2Count++
}
}
fmt.Println(c1Count)
fmt.Println(c2Count)
}
Daisy-chain (it's simply elegant):
from slide:https://talks.golang.org/2012/concurrency.slide
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}
Legacy golang code alert (prior Go1.5):
runtime.GOMAXPROCS(runtime.NumCPU())i.e Go 1.5 is set to make the default value of GOMAXPROCS
the same as the number of CPUs on your machine,
so above code isn't necessary anymore.
Channel error/result back to caller:
https://play.golang.org/p/aCowW5mNEp1
package main
import "fmt"
func main() {
type result struct {
stat int
}
CheckErr := func(input chan int) <-chan result {
r := make(chan result)
go func() {
defer close(r)
for i := range input {
r <- result{i}
}
}()
return r
}
input := make(chan int)
go func() {
defer close(input)
for i := range [10]struct{}{} {
input <- i
}
}()
for result := range CheckErr(input) {
fmt.Println(result)
}
}
The or-channel:
https://play.golang.org/p/XsDYdU8ks5D
package main
import (
"time"
)
func main() {
var or func(channels ...chan interface{}) <-chan interface{}
or = func(channels ...chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone) // Trick is here :-)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
chs := make([]chan interface{}, 7)
for i, _ := range chs {
chs[i] = make(chan interface{}, 1)
}
done := make(chan struct{})
go func() {
<-or(chs...)
done <- struct{}{}
}()
time.Sleep(3 * time.Second)
chs[0] <- struct{}{}
<-done
}
Best Practices for Constructing Pipelines:
https://play.golang.org/p/4efPkGXUSoM
package main
import "fmt"
func main() {
pipe := func(done chan struct{}, i ...int) chan int {
retch := make(chan int)
go func() {
defer close(retch)
for fi := range i {
select {
case <-done:
case retch <- fi:
}
}
}()
return retch
}
done := make(chan struct{})
for c := range pipe(done, 47, 38, 29) {
fmt.Println(c)
}
}
Fan-Out, Fan-In (auh, map-reduce):
https://play.golang.org/p/fmGAGFCuzDy
Fan-out:
Use for-range as pipeline channel to fan-out.
Take advantage of
var wg sync.WaitGroup
defer wg.done()
wg.Wait()
to fan-in.
package main
import "sync"
func main() {
fanIn := func(done <-chan interface{}, channels ...<-chan interface{},
) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
done := make(chan interface{})
chan1 := make(chan interface{}, 1)
chan2 := make(chan interface{}, 1)
chan1 <- struct{}{}
chan2 <- struct{}{}
<-fanIn(done, chan1, chan2)
}
The or-done-channel:
https://play.golang.org/p/HsgZe8Ke9xU
Acted as a building block for other services.
package main
import (
"fmt"
"time"
)
func main() {
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
ch := make(chan interface{}, 3)
ch <- 1
ch <- 2
ch <- 3
go func() {
time.Sleep(3 * time.Second)
close(ch)
}()
for v := range orDone(make(chan interface{}), ch) {
fmt.Println(v)
}
}
The tee-channel:
https://play.golang.org/p/bcbVnfmUdBT
interleave output.
package main
import (
"fmt"
"time"
)
func main() {
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
done := make(chan interface{})
ch := make(chan interface{}, 3)
ch <- 1
ch <- 2
ch <- 3
tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
return
case out1 <- val:
out1 = nil // next loop out1 will be ignored.
case out2 <- val:
out2 = nil // ditto
}
}
}
}()
return out1, out2
}
go func() {
time.Sleep(3 * time.Second)
done <- struct{}{}
}()
a, b := tee(done, ch)
for v := range a {
fmt.Printf("a val: %v\n", v)
fmt.Printf("b val: %v\n", <-b)
}
}
The bridge-channel:
https://play.golang.org/p/jvTDGZbFeZO
Channel contains channels.
Fan-out by assigning each sub-channels into
fan-in channel for caller to consume.
Till now you can see most of the implements from the book
use 0 buffer channel.
It's the lazy-binding technique which only processes data until
the caller consumes the processed data.
Like 'yield' (i.e coroutine, light weight thread/green thread)
in major languages, e.g C#, Python
Reference:
[C++]
http://c9x.me/articles/gthreads/intro.html
package main
import (
"fmt"
"time"
)
func main() {
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
bridge := func(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) {
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
done := make(chan interface{})
base := make(chan interface{}, 1)
next := make(chan (<-chan interface{}), 1)
base <- 42
next <- base
go func() {
time.Sleep(3 * time.Second)
close(base)
close(next)
}()
result := bridge(done, next)
for i := range result {
fmt.Println(i)
}
}
Queuing:
Buffered channels piped together then run in sequence.
The receiving type and returning type could be the same thus forms a Monad..
Reference:
http://vsdmars.blogspot.com/2017/09/fp-functor-applicative-monoid-monad.html
http://vsdmars.blogspot.com/2016/08/fp-related-readstudy-links.html
The context Package:
Familiar with the usage, skim over this section.
Timeouts and Cancellation:
Use context timeout mechanism, passing down context instance
through the calling stack.
Heartbeat:
https://play.golang.org/p/4RHoiIjnLLL
Use time.Tick(interval) https://golang.org/pkg/time/#Tick as returning channel.
Can be used as interval probing check.
time.After can be used to check if a select statement is in healthy state within the timeout.
package main
import (
"fmt"
"time"
)
func main() {
doWork := func(done <-chan interface{},
pulseInterval time.Duration) (<-chan interface{}, <-chan
time.Time) {
heartbeat := make(chan interface{})
results := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(results)
pulse := time.Tick(pulseInterval)
workGen := time.Tick(2 * pulseInterval)
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default:
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case results <- r:
return
}
}
}
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}
done := make(chan interface{})
go func() {
defer close(done)
time.Sleep(3 * time.Second)
}()
h, r := doWork(done, 120*time.Second)
for {
select {
case _, ok := <-h:
if ok == false {
return
}
case _, ok := <-r:
if ok == false {
return
}
case <-time.After(1 * time.Second):
fmt.Println("worker goroutine is not healthy!")
return
}
}
}
Purpose of nil channel:
Avoid busy loop.
func merge(a, b <-chan int) <-chan int {
c := make(chan int)
go func() {
defer close(c)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok {
fmt.Println("a is done")
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok {
fmt.Println("b is done")
b = nil
continue
}
c <- v
}
}
}()
return c
}
-chan>

No comments:
Post a Comment
Note: Only a member of this blog may post a comment.