Oct 11, 2018

[Go][book] Concurrency in Go - Katherine Cox-Buday

Summary of reading:
Concurrency in Go - Katherine Cox-Buday

I agree with Eli Bendersky's point of view about this book i.e
if coming from the arena of C++, which is that, the book is relatively
can be considered as a 'reference book' for some golang tricks.

Before delve into golang concurrency, the understanding of
golang scheduler is a must.
Reference:
http://vsdmars.blogspot.com/2018/06/golang-go-scheduler.html
http://vsdmars.blogspot.com/2018/07/golangnote-analysis-of-go-runtime.html
HPX lib (C++)
Intel TBB (C++)

Understanding memory model is also essential to concurrency, although in golang, this is mostly hidden by it's run-time and channel.
Reference:
http://vsdmars.blogspot.com/2018/09/concurrency-c-wrap-up-2018.html


For advanced concurrency programming, take a look at
The Art of Multiprocessor Programming - Maurice Herlihy & Nir Shavit
Reference:
http://vsdmars.blogspot.com/2016/01/multiprocessor-programming-types-of.html

Jotting down reading notes for those are fun/useful :-)

RWLock:
[C++]
std::shared_mutex

[Golang]
sync.RWMutex

[Python]
None, and i coined one for fun:
https://github.com/verbalsaintmars/python_util/tree/master/rwlock

Cond Variable:
[C++]
std::condition_variable

[Golang]
sync.NewCond

sync.Once dead lock example:
var onceA, onceB sync.Once
var initB func()
initA := func() { onceB.Do(initB) }
initB = func() { onceA.Do(initA) }
onceA.Do(initA)

sync.Pool
sync.Pool can be the panacea for creating new type instance
inside a 'loop', i.e for loop.

Experienced engineer could spot the performance hit 'bad design'
by seeing new/make_shared/NewXXX etc. inside a for/while loop.

Reusing the type instance from being GCed boosts the performance;
however, just that, the pool stored type instance should be stateless
to avoid side-effect.

GC can still drain the Pool, thus a .Get() with drained Pool will create a new type's instance.

Further intriguing read:
Purpose of sync.Pool 
Learn High Performance Go 

Close(Channel):
close(channel)  acts as a signal for those blocking channels to continue.
e.g
https://play.golang.org/p/k3143TbiqO4
package main

import (
 "fmt"
 "time"
)

var ch = make(chan struct{})

func run(cnt int) {
 <-ch
 fmt.Println(cnt)
}

func main() {
 go run(1)
 go run(2)
 go run(3)
 go run(4)

 time.Sleep(10 * time.Second)
 close(ch)
 time.Sleep(10 * time.Second)
}

Select as normalized distribution seed:
Using select to act as a normalized distribution seed.
e.g:
https://play.golang.org/p/2m4VB9rhfcU
package main
import "fmt"

func main() {
 c1 := make(chan interface{})
 close(c1)
 c2 := make(chan interface{})
 close(c2)

 var c1Count, c2Count int

 for i := 1000; i >= 0; i-- {
  select {
  case <-c1:
   c1Count++
  case <-c2:
   c2Count++
  }
 }
 fmt.Println(c1Count)
 fmt.Println(c2Count)
}

Daisy-chain (it's simply elegant):
from slide:https://talks.golang.org/2012/concurrency.slide
func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 10000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost
    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }
    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}



Legacy golang code alert (prior Go1.5):
runtime.GOMAXPROCS(runtime.NumCPU())
i.e Go 1.5 is set to make the default value of GOMAXPROCS
the same as the number of CPUs on your machine,
so above code isn't necessary anymore.


Channel error/result back to caller:
https://play.golang.org/p/aCowW5mNEp1
package main

import "fmt"

func main() {
 type result struct {
  stat int
 }

 CheckErr := func(input chan int) <-chan result {
  r := make(chan result)

  go func() {
   defer close(r)
   for i := range input {
    r <- result{i}
   }
  }()

  return r
 }

 input := make(chan int)

 go func() {
  defer close(input)
  for i := range [10]struct{}{} {
   input <- i
  }
 }()

 for result := range CheckErr(input) {
  fmt.Println(result)
 }

}

The or-channel: 
https://play.golang.org/p/XsDYdU8ks5D
package main

import (
 "time"
)

func main() {

 var or func(channels ...chan interface{}) <-chan interface{}

 or = func(channels ...chan interface{}) <-chan interface{} {
  switch len(channels) {
  case 0:
   return nil
  case 1:
   return channels[0]
  }

  orDone := make(chan interface{})

  go func() {
   defer close(orDone) // Trick is here :-)

   switch len(channels) {
   case 2:
    select {
    case <-channels[0]:
    case <-channels[1]:
    }
   default:
    select {
    case <-channels[0]:
    case <-channels[1]:
    case <-channels[2]:
    case <-or(append(channels[3:], orDone)...):
    }
   }
  }()
  return orDone
 }

 chs := make([]chan interface{}, 7)
 for i, _ := range chs {
  chs[i] = make(chan interface{}, 1)
 }

 done := make(chan struct{})

 go func() {
  <-or(chs...)
  done <- struct{}{}
 }()
 time.Sleep(3 * time.Second)
 chs[0] <- struct{}{}
 <-done
}


Best Practices for Constructing Pipelines:
https://play.golang.org/p/4efPkGXUSoM
package main

import "fmt"

func main() {
 pipe := func(done chan struct{}, i ...int) chan int {
  retch := make(chan int)

  go func() {
   defer close(retch)
   for fi := range i {
    select {
    case <-done:
    case retch <- fi:
    }
   }
  }()

  return retch
 }

 done := make(chan struct{})
 for c := range pipe(done, 47, 38, 29) {
  fmt.Println(c)
 }
}


Fan-Out, Fan-In (auh, map-reduce):
https://play.golang.org/p/fmGAGFCuzDy
Fan-out:
Use for-range as pipeline channel to fan-out.

Take advantage of
var wg sync.WaitGroup
defer wg.done()
wg.Wait()
to fan-in.
package main

import "sync"

func main() {

 fanIn := func(done <-chan interface{}, channels ...<-chan interface{},
 ) <-chan interface{} {

  var wg sync.WaitGroup

  multiplexedStream := make(chan interface{})

  multiplex := func(c <-chan interface{}) {
   defer wg.Done()
   for i := range c {
    select {
    case <-done:
     return
    case multiplexedStream <- i:
    }
   }
  }

  wg.Add(len(channels))

  for _, c := range channels {
   go multiplex(c)
  }

  go func() {
   wg.Wait()
   close(multiplexedStream)
  }()

  return multiplexedStream
 }
 done := make(chan interface{})
 chan1 := make(chan interface{}, 1)
 chan2 := make(chan interface{}, 1)
 chan1 <- struct{}{}
 chan2 <- struct{}{}
 <-fanIn(done, chan1, chan2)
}


The or-done-channel:
https://play.golang.org/p/HsgZe8Ke9xU
Acted as a building block for other services.
package main

import (
 "fmt"
 "time"
)

func main() {

 orDone := func(done, c <-chan interface{}) <-chan interface{} {
  valStream := make(chan interface{})
  go func() {
   defer close(valStream)
   for {
    select {
    case <-done:
     return
    case v, ok := <-c:
     if ok == false {
      return
     }
     select {
     case valStream <- v:
     case <-done:
     }
    }
   }
  }()
  return valStream
 }
 ch := make(chan interface{}, 3)
 ch <- 1
 ch <- 2
 ch <- 3

 go func() {
  time.Sleep(3 * time.Second)
  close(ch)
 }()

 for v := range orDone(make(chan interface{}), ch) {
  fmt.Println(v)
 }
}


The tee-channel:
https://play.golang.org/p/bcbVnfmUdBT
interleave output.
package main

import (
 "fmt"
 "time"
)

func main() {

 orDone := func(done, c <-chan interface{}) <-chan interface{} {
  valStream := make(chan interface{})
  go func() {
   defer close(valStream)
   for {
    select {
    case <-done:
     return
    case v, ok := <-c:
     if ok == false {
      return
     }
     select {
     case valStream <- v:
     case <-done:
     }
    }
   }
  }()
  return valStream
 }

 done := make(chan interface{})
 ch := make(chan interface{}, 3)
 ch <- 1
 ch <- 2
 ch <- 3

 tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
  out1 := make(chan interface{})
  out2 := make(chan interface{})

  go func() {
   defer close(out1)
   defer close(out2)

   for val := range orDone(done, in) {
    var out1, out2 = out1, out2
    for i := 0; i < 2; i++ {
     select {
     case <-done:
      return
     case out1 <- val:
      out1 = nil // next loop out1 will be ignored.
     case out2 <- val:
      out2 = nil // ditto
     }
    }
   }
  }()
  return out1, out2
 }

 go func() {
  time.Sleep(3 * time.Second)
  done <- struct{}{}
 }()

 a, b := tee(done, ch)
 for v := range a {
  fmt.Printf("a val: %v\n", v)
  fmt.Printf("b val: %v\n", <-b)
 }
}



The bridge-channel:
https://play.golang.org/p/jvTDGZbFeZO
Channel contains channels.
Fan-out by assigning each sub-channels into
fan-in channel for caller to consume.

Till now you can see most of the implements from the book
use 0 buffer channel.

It's the lazy-binding technique which only processes data until
the caller consumes the processed data.

Like 'yield' (i.e coroutine, light weight thread/green thread)
in major languages, e.g C#, Python
Reference:
[C++]
http://c9x.me/articles/gthreads/intro.html
package main

import (
 "fmt"
 "time"
)

func main() {
 orDone := func(done, c <-chan interface{}) <-chan interface{} {
  valStream := make(chan interface{})
  go func() {
   defer close(valStream)
   for {
    select {
    case <-done:
     return
    case v, ok := <-c:
     if ok == false {
      return
     }
     select {
     case valStream <- v:
     case <-done:
     }
    }
   }
  }()
  return valStream
 }

 bridge := func(
  done <-chan interface{},
  chanStream <-chan <-chan interface{},
 ) <-chan interface{} {

  valStream := make(chan interface{})

  go func() {
   defer close(valStream)

   for {
    var stream <-chan interface{}

    select {
    case maybeStream, ok := <-chanStream:
     if ok == false {
      return
     }
     stream = maybeStream
    case <-done:
     return
    }

    for val := range orDone(done, stream) {
     select {
     case valStream <- val:
     case <-done:
     }
    }
   }
  }()
  return valStream
 }

 done := make(chan interface{})
 base := make(chan interface{}, 1)
 next := make(chan (<-chan interface{}), 1)
 base <- 42
 next <- base

 go func() {
  time.Sleep(3 * time.Second)
  close(base)
  close(next)
 }()

 result := bridge(done, next)
 for i := range result {
  fmt.Println(i)
 }
}



Queuing:
Buffered channels piped together then run in sequence.
The receiving type and returning type could be the same thus forms a Monad..
Reference:
http://vsdmars.blogspot.com/2017/09/fp-functor-applicative-monoid-monad.html
http://vsdmars.blogspot.com/2016/08/fp-related-readstudy-links.html



The context Package:
Familiar with the usage, skim over this section.

Timeouts and Cancellation:
Use context timeout mechanism, passing down context instance
through the calling stack.


Heartbeat:
https://play.golang.org/p/4RHoiIjnLLL
Use time.Tick(interval) https://golang.org/pkg/time/#Tick  as returning channel.
Can be used as interval probing check.

time.After can be used to check if a select statement is in healthy state within the timeout.
package main

import (
 "fmt"
 "time"
)

func main() {
 doWork := func(done <-chan interface{},
  pulseInterval time.Duration) (<-chan interface{}, <-chan 
time.Time) {

  heartbeat := make(chan interface{})
  results := make(chan time.Time)

  go func() {
   defer close(heartbeat)
   defer close(results)

   pulse := time.Tick(pulseInterval)
   workGen := time.Tick(2 * pulseInterval)

   sendPulse := func() {
    select {
    case heartbeat <- struct{}{}:
    default:
    }
   }

   sendResult := func(r time.Time) {
    for {
     select {
     case <-done:
      return
     case <-pulse:
      sendPulse()
     case results <- r:
      return
     }
    }
   }

   for {
    select {
    case <-done:
     return
    case <-pulse:
     sendPulse()
    case r := <-workGen:
     sendResult(r)
    }
   }
  }()

  return heartbeat, results
 }

 done := make(chan interface{})

 go func() {
  defer close(done)
  time.Sleep(3 * time.Second)
 }()

 h, r := doWork(done, 120*time.Second)

 for {
  select {
  case _, ok := <-h:
   if ok == false {
    return
   }
  case _, ok := <-r:
   if ok == false {
    return
   }
  case <-time.After(1 * time.Second):
   fmt.Println("worker goroutine is not healthy!")
   return
  }
 }
}

Purpose of nil channel:
Avoid busy loop.
func merge(a, b <-chan int) <-chan int {
 c := make(chan int)
 go func() {
  defer close(c)
  for a != nil || b != nil {
   select {
   case v, ok := <-a:
    if !ok {
     fmt.Println("a is done")
     a = nil
     continue
    }
    c <- v
   case v, ok := <-b:
    if !ok {
     fmt.Println("b is done")
     b = nil
     continue
    }
    c <- v
   }
  }
 }()
 return c
}

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.