Summary of reading:
Concurrency in Go - Katherine Cox-Buday
I agree with Eli Bendersky's point of view about this book i.e
if coming from the arena of C++, which is that, the book is relatively
can be considered as a 'reference book' for some golang tricks.
Before delve into golang concurrency, the understanding of
golang scheduler is a must.
Reference:
http://vsdmars.blogspot.com/2018/06/golang-go-scheduler.html
http://vsdmars.blogspot.com/2018/07/golangnote-analysis-of-go-runtime.html
HPX lib (C++)
Intel TBB (C++)
Understanding memory model is also essential to concurrency, although in golang, this is mostly hidden by it's run-time and channel.
Reference:
http://vsdmars.blogspot.com/2018/09/concurrency-c-wrap-up-2018.html
For advanced concurrency programming, take a look at
The Art of Multiprocessor Programming - Maurice Herlihy & Nir Shavit
Reference:
http://vsdmars.blogspot.com/2016/01/multiprocessor-programming-types-of.html
Jotting down reading notes for those are fun/useful :-)
RWLock:
[C++]
std::shared_mutex
[Golang]
sync.RWMutex
[Python]
None, and i coined one for fun:
https://github.com/verbalsaintmars/python_util/tree/master/rwlock
Cond Variable:
[C++]
std::condition_variable
[Golang]
sync.NewCond
sync.Once dead lock example:
Concurrency in Go - Katherine Cox-Buday
I agree with Eli Bendersky's point of view about this book i.e
if coming from the arena of C++, which is that, the book is relatively
can be considered as a 'reference book' for some golang tricks.
Before delve into golang concurrency, the understanding of
golang scheduler is a must.
Reference:
http://vsdmars.blogspot.com/2018/06/golang-go-scheduler.html
http://vsdmars.blogspot.com/2018/07/golangnote-analysis-of-go-runtime.html
HPX lib (C++)
Intel TBB (C++)
Understanding memory model is also essential to concurrency, although in golang, this is mostly hidden by it's run-time and channel.
Reference:
http://vsdmars.blogspot.com/2018/09/concurrency-c-wrap-up-2018.html
For advanced concurrency programming, take a look at
The Art of Multiprocessor Programming - Maurice Herlihy & Nir Shavit
Reference:
http://vsdmars.blogspot.com/2016/01/multiprocessor-programming-types-of.html
Jotting down reading notes for those are fun/useful :-)
RWLock:
[C++]
std::shared_mutex
[Golang]
sync.RWMutex
[Python]
None, and i coined one for fun:
https://github.com/verbalsaintmars/python_util/tree/master/rwlock
Cond Variable:
[C++]
std::condition_variable
[Golang]
sync.NewCond
sync.Once dead lock example:
var onceA, onceB sync.Once var initB func() initA := func() { onceB.Do(initB) } initB = func() { onceA.Do(initA) } onceA.Do(initA)
sync.Pool
sync.Pool can be the panacea for creating new type instance
inside a 'loop', i.e for loop.
Experienced engineer could spot the performance hit 'bad design'
by seeing new/make_shared/NewXXX etc. inside a for/while loop.
Reusing the type instance from being GCed boosts the performance;
however, just that, the pool stored type instance should be stateless
to avoid side-effect.
GC can still drain the Pool, thus a .Get() with drained Pool will create a new type's instance.
Further intriguing read:
Purpose of sync.Pool
Learn High Performance Go
Close(Channel):
close(channel) acts as a signal for those blocking channels to continue.
e.g
https://play.golang.org/p/k3143TbiqO4
package main import ( "fmt" "time" ) var ch = make(chan struct{}) func run(cnt int) { <-ch fmt.Println(cnt) } func main() { go run(1) go run(2) go run(3) go run(4) time.Sleep(10 * time.Second) close(ch) time.Sleep(10 * time.Second) }
Select as normalized distribution seed:
Using select to act as a normalized distribution seed.
e.g:
https://play.golang.org/p/2m4VB9rhfcU
package main import "fmt" func main() { c1 := make(chan interface{}) close(c1) c2 := make(chan interface{}) close(c2) var c1Count, c2Count int for i := 1000; i >= 0; i-- { select { case <-c1: c1Count++ case <-c2: c2Count++ } } fmt.Println(c1Count) fmt.Println(c2Count) }
Daisy-chain (it's simply elegant):
from slide:https://talks.golang.org/2012/concurrency.slide
func f(left, right chan int) { left <- 1 + <-right } func main() { const n = 10000 leftmost := make(chan int) right := leftmost left := leftmost for i := 0; i < n; i++ { right = make(chan int) go f(left, right) left = right } go func(c chan int) { c <- 1 }(right) fmt.Println(<-leftmost) }
Legacy golang code alert (prior Go1.5):
runtime.GOMAXPROCS(runtime.NumCPU())i.e Go 1.5 is set to make the default value of GOMAXPROCS
the same as the number of CPUs on your machine,
so above code isn't necessary anymore.
Channel error/result back to caller:
https://play.golang.org/p/aCowW5mNEp1
package main import "fmt" func main() { type result struct { stat int } CheckErr := func(input chan int) <-chan result { r := make(chan result) go func() { defer close(r) for i := range input { r <- result{i} } }() return r } input := make(chan int) go func() { defer close(input) for i := range [10]struct{}{} { input <- i } }() for result := range CheckErr(input) { fmt.Println(result) } }
The or-channel:
https://play.golang.org/p/XsDYdU8ks5D
package main import ( "time" ) func main() { var or func(channels ...chan interface{}) <-chan interface{} or = func(channels ...chan interface{}) <-chan interface{} { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan interface{}) go func() { defer close(orDone) // Trick is here :-) switch len(channels) { case 2: select { case <-channels[0]: case <-channels[1]: } default: select { case <-channels[0]: case <-channels[1]: case <-channels[2]: case <-or(append(channels[3:], orDone)...): } } }() return orDone } chs := make([]chan interface{}, 7) for i, _ := range chs { chs[i] = make(chan interface{}, 1) } done := make(chan struct{}) go func() { <-or(chs...) done <- struct{}{} }() time.Sleep(3 * time.Second) chs[0] <- struct{}{} <-done }
Best Practices for Constructing Pipelines:
https://play.golang.org/p/4efPkGXUSoM
package main import "fmt" func main() { pipe := func(done chan struct{}, i ...int) chan int { retch := make(chan int) go func() { defer close(retch) for fi := range i { select { case <-done: case retch <- fi: } } }() return retch } done := make(chan struct{}) for c := range pipe(done, 47, 38, 29) { fmt.Println(c) } }
Fan-Out, Fan-In (auh, map-reduce):
https://play.golang.org/p/fmGAGFCuzDy
Fan-out:
Use for-range as pipeline channel to fan-out.
Take advantage of
var wg sync.WaitGroup
defer wg.done()
wg.Wait()
to fan-in.
package main import "sync" func main() { fanIn := func(done <-chan interface{}, channels ...<-chan interface{}, ) <-chan interface{} { var wg sync.WaitGroup multiplexedStream := make(chan interface{}) multiplex := func(c <-chan interface{}) { defer wg.Done() for i := range c { select { case <-done: return case multiplexedStream <- i: } } } wg.Add(len(channels)) for _, c := range channels { go multiplex(c) } go func() { wg.Wait() close(multiplexedStream) }() return multiplexedStream } done := make(chan interface{}) chan1 := make(chan interface{}, 1) chan2 := make(chan interface{}, 1) chan1 <- struct{}{} chan2 <- struct{}{} <-fanIn(done, chan1, chan2) }
The or-done-channel:
https://play.golang.org/p/HsgZe8Ke9xU
Acted as a building block for other services.
package main import ( "fmt" "time" ) func main() { orDone := func(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } ch := make(chan interface{}, 3) ch <- 1 ch <- 2 ch <- 3 go func() { time.Sleep(3 * time.Second) close(ch) }() for v := range orDone(make(chan interface{}), ch) { fmt.Println(v) } }
The tee-channel:
https://play.golang.org/p/bcbVnfmUdBT
interleave output.
package main import ( "fmt" "time" ) func main() { orDone := func(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } done := make(chan interface{}) ch := make(chan interface{}, 3) ch <- 1 ch <- 2 ch <- 3 tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range orDone(done, in) { var out1, out2 = out1, out2 for i := 0; i < 2; i++ { select { case <-done: return case out1 <- val: out1 = nil // next loop out1 will be ignored. case out2 <- val: out2 = nil // ditto } } } }() return out1, out2 } go func() { time.Sleep(3 * time.Second) done <- struct{}{} }() a, b := tee(done, ch) for v := range a { fmt.Printf("a val: %v\n", v) fmt.Printf("b val: %v\n", <-b) } }
The bridge-channel:
https://play.golang.org/p/jvTDGZbFeZO
Channel contains channels.
Fan-out by assigning each sub-channels into
fan-in channel for caller to consume.
Till now you can see most of the implements from the book
use 0 buffer channel.
It's the lazy-binding technique which only processes data until
the caller consumes the processed data.
Like 'yield' (i.e coroutine, light weight thread/green thread)
in major languages, e.g C#, Python
Reference:
[C++]
http://c9x.me/articles/gthreads/intro.html
package main import ( "fmt" "time" ) func main() { orDone := func(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } bridge := func( done <-chan interface{}, chanStream <-chan <-chan interface{}, ) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { var stream <-chan interface{} select { case maybeStream, ok := <-chanStream: if ok == false { return } stream = maybeStream case <-done: return } for val := range orDone(done, stream) { select { case valStream <- val: case <-done: } } } }() return valStream } done := make(chan interface{}) base := make(chan interface{}, 1) next := make(chan (<-chan interface{}), 1) base <- 42 next <- base go func() { time.Sleep(3 * time.Second) close(base) close(next) }() result := bridge(done, next) for i := range result { fmt.Println(i) } }
Queuing:
Buffered channels piped together then run in sequence.
The receiving type and returning type could be the same thus forms a Monad..
Reference:
http://vsdmars.blogspot.com/2017/09/fp-functor-applicative-monoid-monad.html
http://vsdmars.blogspot.com/2016/08/fp-related-readstudy-links.html
The context Package:
Familiar with the usage, skim over this section.
Timeouts and Cancellation:
Use context timeout mechanism, passing down context instance
through the calling stack.
Heartbeat:
https://play.golang.org/p/4RHoiIjnLLL
Use time.Tick(interval) https://golang.org/pkg/time/#Tick as returning channel.
Can be used as interval probing check.
time.After can be used to check if a select statement is in healthy state within the timeout.
package main import ( "fmt" "time" ) func main() { doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) results := make(chan time.Time) go func() { defer close(heartbeat) defer close(results) pulse := time.Tick(pulseInterval) workGen := time.Tick(2 * pulseInterval) sendPulse := func() { select { case heartbeat <- struct{}{}: default: } } sendResult := func(r time.Time) { for { select { case <-done: return case <-pulse: sendPulse() case results <- r: return } } } for { select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } done := make(chan interface{}) go func() { defer close(done) time.Sleep(3 * time.Second) }() h, r := doWork(done, 120*time.Second) for { select { case _, ok := <-h: if ok == false { return } case _, ok := <-r: if ok == false { return } case <-time.After(1 * time.Second): fmt.Println("worker goroutine is not healthy!") return } } }
Purpose of nil channel:
Avoid busy loop.
func merge(a, b <-chan int) <-chan int { c := make(chan int) go func() { defer close(c) for a != nil || b != nil { select { case v, ok := <-a: if !ok { fmt.Println("a is done") a = nil continue } c <- v case v, ok := <-b: if !ok { fmt.Println("b is done") b = nil continue } c <- v } } }() return c }-chan>
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.