假设有个字符串:var str = "The MapReduce library in the user program first splits the input files into M pie
假设有个字符串:
var str = "The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines."
需要统计里面每个字母出现的次数。最直观简单的做法就是利用一个 map,从开始到末尾读这个字符串,并把字母作为 key,出现的次数作为 value。Map 中包含 key 的时候,value + 1,Map 中没有 key 的时候默认 1。最后读完这个字符串就 OK。
var m = make(map[string]int)temp := strings.Split(str, "")for _, c := range temp { if !unicode.IsLetter([]rune(c)[0]) { continue } if count, ok := m[c]; ok { m[c] = count + 1 } else { m[c] = 1 }}
[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]
在现实世界中,这个 str
可能非常巨大,所以有时候我们需要将源文本拆分成多个小的字符串,然后多个线程同时处理,每个线程计算得到当前的中间结果,最后合并到一起。
上述的过程在函数式编程中可以被抽象为 Map 和 Reduce 两个函数。其中 Map 函数是把一个数组的每个元素按照相同的逻辑处理之后返回的结果,Reduce 函数是把所有元素整合起来得到结果。通常这个两个函数的参数都是函数,Map 的返回值一般也是数组,Reduce 的返回值可能是各种类型。
.为了在单机上实现出并发处理的效果,可以用 Go 自带的 goroutine 来实现。下面把拆分的工作省略,直接进入主题
接下来用 4 个 goroutine 同时处理这些 string,每个做 goroutine 利用 单机串行版
的逻辑,生产出一个小规模的中间内容。随后把每个中间内容都整合起来得到最终值。接下来需要考虑
- Go 天生支持 CSP 编程模型,所以利用 channel 做通信没有问题
- 是否有 data race
package mainimport ( "strings" "sync" "unicode")type ResultMap struct { sync.Mutex result map[string]int}func main() { str1 := "The MapReduce library in the user program first" str2 := "splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB)" str3 := "per piece (controllable by the user via an optional parameter)." str4 := "It then starts up many copies of the program on a cluster of machines." strs := []string {str1, str2, str3, str4} // 主线程需要阻塞直到所有的 reduce 都结束 var waitGroup sync.WaitGroup waitGroup.Add(len(strs)) c := make(chan map[string]int) res := new(ResultMap) res.result = make(map[string]int) for _, str := range strs { go doMap(str, c) go doReduce(c, res, &waitGroup) } waitGroup.Wait() sortPrintMap(res.result)}// 生产出对应的 kv 传递给 channelfunc doMap(str string, c chan map[string]int) { temp := strings.Split(str, "") m := make(map[string]int) for _, c := range temp { if !unicode.IsLetter([]rune(c)[0]) { continue } if count, ok := m[c]; ok { m[c] = count + 1 } else { m[c] = 1 } } c <- m}// 合并func doReduce(c chan map[string]int, res *ResultMap, group *sync.WaitGroup) { res.Lock() defer res.Unlock() for k, v := range <- c { if count, ok := res.result[k]; ok { res.result[k] = count + v } else { res.result[k] = v } } group.Done()}
检查一下结果 (Map 的 key 本身是无序的,这里是排好序之后的)
[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]
结果无误之后,这个问题可以再深入
- 上述的 reduce 和 map 是单机上的,之间的数据共享用了 channel,如果是物理隔离的场景下,如何用别的东西做数据共享?
- 任何一个子任务都有可能因为各种原因挂掉,如何在某个子任务挂掉的情况下,系统的准确性不受影响,甚至能自愈?
- 上述的 goroutine 在执行结束之后就会被调度器回收,但实际上因为 map 总是会比 reduce 先结束,那么后期的过程实际上可以有更多的 goroutine 可以参与到 reduce 任务中 r 如何实现这种调度让资源可以被更加充分的利用?
.

- 0