mirror of
https://github.com/tmrts/go-patterns.git
synced 2024-11-22 04:56:09 +03:00
Implement parallelism patterns
This commit is contained in:
parent
4c276c72e1
commit
4f4ceb2dc1
121
bounded_parallelism/md5.go
Normal file
121
bounded_parallelism/md5.go
Normal file
@ -0,0 +1,121 @@
|
|||||||
|
package bounded_parallelism
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// walkFiles starts a goroutine to walk the directory tree at root and send the
|
||||||
|
// path of each regular file on the string channel. It sends the result of the
|
||||||
|
// walk on the error channel. If done is closed, walkFiles abandons its work.
|
||||||
|
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
|
||||||
|
paths := make(chan string)
|
||||||
|
errc := make(chan error, 1)
|
||||||
|
go func() { // HL
|
||||||
|
// Close the paths channel after Walk returns.
|
||||||
|
defer close(paths) // HL
|
||||||
|
// No select needed for this send, since errc is buffered.
|
||||||
|
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !info.Mode().IsRegular() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case paths <- path: // HL
|
||||||
|
case <-done: // HL
|
||||||
|
return errors.New("walk canceled")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
return paths, errc
|
||||||
|
}
|
||||||
|
|
||||||
|
// A result is the product of reading and summing a file using MD5.
|
||||||
|
type result struct {
|
||||||
|
path string
|
||||||
|
sum [md5.Size]byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// digester reads path names from paths and sends digests of the corresponding
|
||||||
|
// files on c until either paths or done is closed.
|
||||||
|
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
|
||||||
|
for path := range paths { // HLpaths
|
||||||
|
data, err := ioutil.ReadFile(path)
|
||||||
|
select {
|
||||||
|
case c <- result{path, md5.Sum(data), err}:
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MD5All reads all the files in the file tree rooted at root and returns a map
|
||||||
|
// from file path to the MD5 sum of the file's contents. If the directory walk
|
||||||
|
// fails or any read operation fails, MD5All returns an error. In that case,
|
||||||
|
// MD5All does not wait for inflight read operations to complete.
|
||||||
|
func MD5All(root string) (map[string][md5.Size]byte, error) {
|
||||||
|
// MD5All closes the done channel when it returns; it may do so before
|
||||||
|
// receiving all the values from c and errc.
|
||||||
|
done := make(chan struct{})
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
paths, errc := walkFiles(done, root)
|
||||||
|
|
||||||
|
// Start a fixed number of goroutines to read and digest files.
|
||||||
|
c := make(chan result) // HLc
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
const numDigesters = 20
|
||||||
|
wg.Add(numDigesters)
|
||||||
|
for i := 0; i < numDigesters; i++ {
|
||||||
|
go func() {
|
||||||
|
digester(done, paths, c) // HLc
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(c) // HLc
|
||||||
|
}()
|
||||||
|
// End of pipeline. OMIT
|
||||||
|
|
||||||
|
m := make(map[string][md5.Size]byte)
|
||||||
|
for r := range c {
|
||||||
|
if r.err != nil {
|
||||||
|
return nil, r.err
|
||||||
|
}
|
||||||
|
m[r.path] = r.sum
|
||||||
|
}
|
||||||
|
// Check whether the Walk failed.
|
||||||
|
if err := <-errc; err != nil { // HLerrc
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Calculate the MD5 sum of all files under the specified directory,
|
||||||
|
// then print the results sorted by path name.
|
||||||
|
m, err := MD5All(os.Args[1])
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var paths []string
|
||||||
|
for path := range m {
|
||||||
|
paths = append(paths, path)
|
||||||
|
}
|
||||||
|
sort.Strings(paths)
|
||||||
|
for _, path := range paths {
|
||||||
|
fmt.Printf("%x %s\n", m[path], path)
|
||||||
|
}
|
||||||
|
}
|
109
parallelism/md5.go
Normal file
109
parallelism/md5.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
package parallelism
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A result is the product of reading and summing a file using MD5.
|
||||||
|
type result struct {
|
||||||
|
path string
|
||||||
|
sum [md5.Size]byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// sumFiles starts goroutines to walk the directory tree at root and digest each
|
||||||
|
// regular file. These goroutines send the results of the digests on the result
|
||||||
|
// channel and send the result of the walk on the error channel. If done is
|
||||||
|
// closed, sumFiles abandons its work.
|
||||||
|
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
|
||||||
|
// For each regular file, start a goroutine that sums the file and sends
|
||||||
|
// the result on c. Send the result of the walk on errc.
|
||||||
|
c := make(chan result)
|
||||||
|
errc := make(chan error, 1)
|
||||||
|
go func() { // HL
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !info.Mode().IsRegular() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() { // HL
|
||||||
|
data, err := ioutil.ReadFile(path)
|
||||||
|
select {
|
||||||
|
case c <- result{path, md5.Sum(data), err}: // HL
|
||||||
|
case <-done: // HL
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
// Abort the walk if done is closed.
|
||||||
|
select {
|
||||||
|
case <-done: // HL
|
||||||
|
return errors.New("walk canceled")
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
})
|
||||||
|
// Walk has returned, so all calls to wg.Add are done. Start a
|
||||||
|
// goroutine to close c once all the sends are done.
|
||||||
|
go func() { // HL
|
||||||
|
wg.Wait()
|
||||||
|
close(c) // HL
|
||||||
|
}()
|
||||||
|
// No select needed here, since errc is buffered.
|
||||||
|
errc <- err // HL
|
||||||
|
}()
|
||||||
|
return c, errc
|
||||||
|
}
|
||||||
|
|
||||||
|
// MD5All reads all the files in the file tree rooted at root and returns a map
|
||||||
|
// from file path to the MD5 sum of the file's contents. If the directory walk
|
||||||
|
// fails or any read operation fails, MD5All returns an error. In that case,
|
||||||
|
// MD5All does not wait for inflight read operations to complete.
|
||||||
|
func MD5All(root string) (map[string][md5.Size]byte, error) {
|
||||||
|
// MD5All closes the done channel when it returns; it may do so before
|
||||||
|
// receiving all the values from c and errc.
|
||||||
|
done := make(chan struct{}) // HLdone
|
||||||
|
defer close(done) // HLdone
|
||||||
|
|
||||||
|
c, errc := sumFiles(done, root) // HLdone
|
||||||
|
|
||||||
|
m := make(map[string][md5.Size]byte)
|
||||||
|
for r := range c { // HLrange
|
||||||
|
if r.err != nil {
|
||||||
|
return nil, r.err
|
||||||
|
}
|
||||||
|
m[r.path] = r.sum
|
||||||
|
}
|
||||||
|
if err := <-errc; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Calculate the MD5 sum of all files under the specified directory,
|
||||||
|
// then print the results sorted by path name.
|
||||||
|
m, err := MD5All(os.Args[1])
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var paths []string
|
||||||
|
for path := range m {
|
||||||
|
paths = append(paths, path)
|
||||||
|
}
|
||||||
|
sort.Strings(paths)
|
||||||
|
for _, path := range paths {
|
||||||
|
fmt.Printf("%x %s\n", m[path], path)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user