From 0a1036f2c608ec0285301823e340cc02e3bdfc6f Mon Sep 17 00:00:00 2001 From: Jian Han Date: Mon, 8 Jan 2018 22:30:16 +1000 Subject: [PATCH] barrier pattern in the book added --- .../ch1/letter_channel_example.go | 10 +- .../mastering_concurrency_in_go/ch1/spider.go | 101 ++++++++++++++++++ .../mastering_concurrency_in_go/main.go | 3 +- .../concurrent_pattern/barrier/main.go | 69 ++++++++++++ .../concurrent_pattern/barrier/main_test.go | 15 +++ 5 files changed, 191 insertions(+), 7 deletions(-) create mode 100644 concurrency/mastering_concurrency_in_go/ch1/spider.go create mode 100644 go_design_pattern_book/concurrent_pattern/barrier/main.go create mode 100644 go_design_pattern_book/concurrent_pattern/barrier/main_test.go diff --git a/concurrency/mastering_concurrency_in_go/ch1/letter_channel_example.go b/concurrency/mastering_concurrency_in_go/ch1/letter_channel_example.go index 1437391..ddbc1fb 100644 --- a/concurrency/mastering_concurrency_in_go/ch1/letter_channel_example.go +++ b/concurrency/mastering_concurrency_in_go/ch1/letter_channel_example.go @@ -3,7 +3,6 @@ package ch1 import ( "fmt" "strings" - "sync" ) var initialString string @@ -11,8 +10,7 @@ var initialBytes []byte var stringLength int var finalString string var lettersProcessed int -var applicationStatus bool -var wg sync.WaitGroup +var applicationStatusL bool func getLetters(gQ chan string) { for i := range initialBytes { @@ -22,7 +20,7 @@ func getLetters(gQ chan string) { func capitalizeLetters(gQ chan string, sQ chan string) { for { if lettersProcessed >= stringLength { - applicationStatus = false + applicationStatusL = false break } select { @@ -34,7 +32,7 @@ func capitalizeLetters(gQ chan string, sQ chan string) { } } func RunLetter() { - applicationStatus = true + applicationStatusL = true getQueue := make(chan string) stackQueue := make(chan string) initialString = `Four score and seven years ago our fathers @@ -50,7 +48,7 @@ created equal.` close(getQueue) close(stackQueue) for { - if applicationStatus == false { + if applicationStatusL == false { fmt.Println("Done") fmt.Println(finalString) break diff --git a/concurrency/mastering_concurrency_in_go/ch1/spider.go b/concurrency/mastering_concurrency_in_go/ch1/spider.go new file mode 100644 index 0000000..50d8a82 --- /dev/null +++ b/concurrency/mastering_concurrency_in_go/ch1/spider.go @@ -0,0 +1,101 @@ +package ch1 + +import ( + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" +) + +var applicationStatus bool +var urls []string +var urlsProcessed int +var foundUrls []string +var fullText string +var totalURLCount int +var wg sync.WaitGroup +var v1 int + +func readURLs(statusChannel chan int, textChannel chan string) { + time.Sleep(time.Millisecond * 1) + fmt.Println("Grabbing", len(urls), "urls") + for i := 0; i < totalURLCount; i++ { + fmt.Println("Url", i, urls[i]) + resp, _ := http.Get(urls[i]) + text, err := ioutil.ReadAll(resp.Body) + textChannel <- string(text) + if err != nil { + fmt.Println("No HTML body") + } + statusChannel <- 0 + } +} + +func addToScrapedText(textChannel chan string, processChannel chan bool) { + for { + select { + case pC := <-processChannel: + if pC == true { + // hang on + } + if pC == false { + close(textChannel) + close(processChannel) + } + case tC := <-textChannel: + fullText += tC + } + } +} + +func evaluateStatus(statusChannel chan int, textChannel chan string, processChannel chan bool) { + for { + select { + case status := <-statusChannel: + fmt.Print(urlsProcessed, totalURLCount) + urlsProcessed++ + if status == 0 { + fmt.Println("Got url") + } + if status == 1 { + close(statusChannel) + } + if urlsProcessed == totalURLCount { + fmt.Println("Read all top-level URLs") + processChannel <- false + applicationStatus = false + } + } + } +} + +func RunSpider() { + applicationStatus = true + statusChannel := make(chan int) + textChannel := make(chan string) + processChannel := make(chan bool) + totalURLCount = 0 + urls = append(urls, "http://www.mastergoco.com/index1.html") + urls = append(urls, "http://www.mastergoco.com/index2.html") + urls = append(urls, "http://www.mastergoco.com/index3.html") + urls = append(urls, "http://www.mastergoco.com/index4.html") + urls = append(urls, "http://www.mastergoco.com/index5.html") + fmt.Println("Starting spider") + urlsProcessed = 0 + totalURLCount = len(urls) + go evaluateStatus(statusChannel, textChannel, processChannel) + go readURLs(statusChannel, textChannel) + go addToScrapedText(textChannel, processChannel) + for { + if applicationStatus == false { + fmt.Println(fullText) + fmt.Println("Done!") + break + } + select { + case sC := <-statusChannel: + fmt.Println("Message on StatusChannel", sC) + } + } +} diff --git a/concurrency/mastering_concurrency_in_go/main.go b/concurrency/mastering_concurrency_in_go/main.go index d468ac4..ffd8cc0 100644 --- a/concurrency/mastering_concurrency_in_go/main.go +++ b/concurrency/mastering_concurrency_in_go/main.go @@ -4,5 +4,6 @@ import "github.com/jianhan/go-patterns/concurrency/mastering_concurrency_in_go/c func main() { // ch1.Run() - ch1.RunLetter() + // ch1.RunLetter() + ch1.RunSpider() } diff --git a/go_design_pattern_book/concurrent_pattern/barrier/main.go b/go_design_pattern_book/concurrent_pattern/barrier/main.go new file mode 100644 index 0000000..78cf34e --- /dev/null +++ b/go_design_pattern_book/concurrent_pattern/barrier/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "fmt" + "io/ioutil" + "net/http" + "time" +) + +func main() { + barrier("http://httpbin.org/headers", "http://httpbin.org/User-Agen") +} + +var timeoutMilliseconds int = 5000 + +type barrierResp struct { + Err error + Resp string +} + +func barrier(endpoints ...string) { + requestNumber := len(endpoints) + in := make(chan barrierResp, requestNumber) + defer close(in) + responses := make([]barrierResp, requestNumber) + for _, endpoint := range endpoints { + go makeRequest(in, endpoint) + } + var hasError bool + for i := 0; i < requestNumber; i++ { + resp := <-in + if resp.Err != nil { + fmt.Println("ERROR: ", resp.Err) + hasError = true + break + } + responses[i] = resp + } + + if !hasError { + for _, resp := range responses { + fmt.Println(resp.Resp) + } + } +} + +func makeRequest(out chan<- barrierResp, url string) { + res := barrierResp{} + client := http.Client{ + Timeout: time.Duration(time.Duration(timeoutMilliseconds) * time.Millisecond), + } + + resp, err := client.Get(url) + if err != nil { + res.Err = err + out <- res + return + } + + byt, err := ioutil.ReadAll(resp.Body) + if err != nil { + res.Err = err + out <- res + return + } + + res.Resp = string(byt) + out <- res +} diff --git a/go_design_pattern_book/concurrent_pattern/barrier/main_test.go b/go_design_pattern_book/concurrent_pattern/barrier/main_test.go new file mode 100644 index 0000000..2abe4ff --- /dev/null +++ b/go_design_pattern_book/concurrent_pattern/barrier/main_test.go @@ -0,0 +1,15 @@ +package main + +import "testing" + +func TestBarrier(t *testing.T) { + t.Run("Correct endpoints", func(t *testing.T) { + endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"} + }) + t.Run("One endpoint incorrect", func(t *testing.T) { + endpoints := []string{"http://malformed-url", "http://httpbin.org/User-Agent"} + }) + t.Run("Very Short Timeout", func(t *testing.T) { + endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"} + }) +}