finished channel behavior example

This commit is contained in:
Jian Han 2018-01-02 15:26:09 +10:00
parent 6808953da8
commit abeeb049c5
2 changed files with 199 additions and 0 deletions

View File

@ -0,0 +1,106 @@
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/davecgh/go-spew/spew"
)
func main() {
ch := make(chan string)
go func() {
p := <-ch
spew.Dump(p)
}()
ch <- "paper"
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
// fanout()
// selectDrop()
// waitForTasks()
withTimeOut()
}
func fanout() {
emps := 20
// ch := make(chan string, emps)
ch := make(chan string)
for e := 0; e < emps; e++ {
go func() {
time.Sleep(time.Duration(rand.Intn(10000)) * time.Millisecond)
ch <- "paper"
}()
}
for emps > 0 {
p := <-ch
fmt.Printf("EMP %s %d \n", p, emps)
emps--
}
}
func selectDrop() {
const cap = 5
ch := make(chan string, cap)
go func() {
for p := range ch {
fmt.Println("employee : received :", p)
}
}()
const work = 20
for w := 0; w < work; w++ {
select {
case ch <- "paper":
fmt.Println("manager : send ack")
default:
fmt.Println("manager : drop")
}
}
close(ch)
}
func waitForTasks() {
ch := make(chan string, 1)
defer close(ch)
go func() {
for p := range ch {
fmt.Println("employee : working :", p)
}
}()
const work = 10
for w := 0; w < work; w++ {
ch <- "paper"
}
}
func withTimeOut() {
duration := 50 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
ch := make(chan string, 1)
go func() {
time.Sleep(time.Duration(rand.Intn(20)) * time.Millisecond)
ch <- "paper"
}()
select {
case p := <-ch:
fmt.Println("work complete", p)
case <-ctx.Done():
fmt.Println("moving on")
}
}

View File

@ -0,0 +1,93 @@
package main
//Multiple functions can read from the same channel until that channel is closed;
// this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
// A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels
// onto a single channel that's closed when all the inputs are closed. This is called fan-in.
import (
"fmt"
"sync"
)
/*
There's no formal definition of a pipeline in Go; it's just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines
receive values from upstream via inbound channels
perform some function on that data, usually producing new values
send values downstream via outbound channels
Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
*/
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
//The merge function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies
// the values to the sole outbound channel. Once all the output goroutines have been started,
// merge starts one more goroutine to close the outbound channel after all sends on that channel are done.
// Sends on a closed channel panic, so it's important to ensure all sends are done before calling close.
// The sync.WaitGroup type provides a simple way to arrange this synchronization:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
//The first stage, gen, is a function that converts a list of integers to a channel that emits the integers in the list.
// The gen function starts a goroutine that sends the integers on the channel and closes the channel when all
// the values have been sent:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
//The second stage, sq, receives integers from a channel and returns a channel that emits the square of
// each received integer. After the inbound channel is closed and this stage has sent all the values downstream,
// it closes the outbound channel:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}