Map-Reduce 论文阅读
分析
实验要求 在 mr/master.go, mr/worker.go, and mr/rpc.go. 中编码,并且只指定了 reduce worker的数量:nreduce。
master最后要以 go run mrmaster.go pg-*.txt 的方式运行,每一个.txt文件对应一个split,一个split分派一个mapworker
worker 以go run mrworker.go wc.so这样的方式运行,因为有多个worker并行运行,所以master要考虑线程安全。
考虑到worker在写入文件的过程中挂掉,遗留未完成的输出文件, 要用 ioutil.TempFile 创建临时文件,写完后, os.Rename 为目标文件。
中间文件用 mr-X-Y的格式命名, 结果文件以 mr-out-X的方式命名
思路
master
来看看master要注意的:
- keepalive,
workers要定时ping master, master 得剔除长时间不在线的worker
- 分发
maptask给mapworker,
mapworker 完成 maptask之后,记录每个mapworker的输出结果。
- 整合
mapworker完成后的工作,分派任务给 reduceworker
- 等待所有
reduceworker完成工作。
worker
再来看看worker要注意什么:
- 定时向
master ping,
- 当空闲时候, 向
master询问有没有工作要做? 让我做mapworker?还是reduceworker? 要是没活要干的话,那我wait,等会儿再来问一遍
- proc assigned task
- notify master when task done
实现
master 一启动就进入mapPhase 向新来的worker分派maptask,当所有maptask 做完后,进入 ReducePhase,所有reducetask做完后,进入Done 状态,然后退出
1
2
3
4
5
6
|
type ProgressStatus int
const (
MapPhase ProgressStatus = iota + 1
ReducePhase
Done
)
|
master还得记录任务的状态:
- 还没指派 worker
- 有 worker 在做了
- 做完了
1
2
3
4
5
6
|
type taskStatus int
const (
notAssign taskStatus = iota + 1
doing
finished
)
|
对于每一个任务,得记录这个人物安排给了那个 woker,这个任务的状态,输入文件路径,输出结果路径
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type mapTask struct {
bindWorker string
ins []string
outs map[int]string
status taskStatus
}
type reduceTask struct {
bindWorker string
ins []string
out string
status taskStatus
}
|
master的元信息要考虑workers的状态: wkerStatus 里的lastOnline。
1
2
3
4
5
6
7
8
9
10
11
|
type meta struct {
nReducer int
mapTasks []mapTask
reduceTasks []reduceTask
workers map[string]wkerStatus
}
type wkerStatus struct {
id string
lastOnline time.Time
}
|
1
2
3
4
5
6
|
type Master struct {
sync.Mutex
meta meta
phase ProgressStatus
}
|
master定时剔除 offline workers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func (m *Master) updateWorkers() {
m.Lock()
defer m.Unlock()
for _, worker := range m.meta.workers {
if time.Now().Sub(worker.lastOnline).Seconds() <= 10 {
continue
}
switch m.phase {
case MapPhase:
for i, task := range m.meta.mapTasks {
if task.bindWorker == worker.id && task.status == doing {
m.meta.mapTasks[i].status = notAssign
delete(m.meta.workers, worker.id)
log.Printf("REMOVE worker %s from map tasks", worker.id)
}
}
case ReducePhase:
for i, task := range m.meta.reduceTasks {
if task.bindWorker == worker.id && task.status == doing {
m.meta.reduceTasks[i].status = notAssign
delete(m.meta.workers, worker.id)
log.Printf("REMOVE worker %s from reduce tasks", worker.id)
}
}
}
}
}
|
当worker来发起taskRequest时, master的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
func (m *Master) TaskRequest(rq *PingArgs, rp *TaskReply) error {
log.Printf("got task request from %s %s\n", rq.WorkerId, rq.Time.Format(time.RFC3339))
m.Lock()
defer m.Unlock()
switch m.phase {
case MapPhase:
for i, task := range m.meta.mapTasks {
if task.status == notAssign {
m.meta.mapTasks[i].status = doing
m.meta.mapTasks[i].bindWorker = rq.WorkerId
*rp = TaskReply{
TaskType: MapWorker,
Inputs: task.ins,
Nreduce: m.meta.nReducer,
TaskId: i,
}
log.Printf("assign map task [%d]to %s , rp %#v \n ",
i, rq.WorkerId, rp)
return nil
} else if task.status == doing {
log.Printf("map task %s is doing\n", task.bindWorker)
}
}
case ReducePhase:
for i, task := range m.meta.reduceTasks {
if task.status == notAssign {
m.meta.reduceTasks[i].status = doing
m.meta.reduceTasks[i].bindWorker = rq.WorkerId
*rp = TaskReply{
TaskType: ReduceWorker,
Nreduce: m.meta.nReducer,
Inputs: task.ins,
TaskId: i,
}
log.Printf("assign reduce task [%d]to %s \n ",
i, rq.WorkerId)
return nil
} else if task.status == doing {
log.Printf("reduce task %s is doing\n", task.bindWorker)
}
}
case Done:
log.Println("works already done")
rp.TaskType = WorkerExit
default:
log.Fatalf("unknown master phase %d\n", m.phase)
}
log.Println("nothing to do wait")
rp.TaskType = Wait
return nil
}
|
当worker做完工作,通知 master 时, master的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
func (m *Master) TaskDone(rq *DoneArgs, rp *DoneReply) error {
m.Lock()
defer m.Unlock()
switch m.phase {
case MapPhase:
m.meta.mapTasks[rq.WorkIdx].status = finished
log.Printf("[%s] [%d] map task Done\n", rq.WorkerId, rq.WorkIdx)
dones := 0
for _, task := range m.meta.mapTasks {
if task.status == finished {
dones++
}
}
if len(rq.Files) != m.meta.nReducer {
log.Fatalf("request files count %d != %d\n",
len(rq.Files), m.meta.nReducer)
}
for idxReduce, file := range rq.Files {
m.meta.reduceTasks[idxReduce].ins =
append(m.meta.reduceTasks[idxReduce].ins, file)
}
if dones == len(m.meta.mapTasks) {
log.Println("go from mapPhae to ReducePhase")
m.phase = ReducePhase
} else {
log.Printf("%d task doing\n", len(m.meta.mapTasks)-dones)
}
case ReducePhase:
m.meta.reduceTasks[rq.WorkIdx].status = finished
log.Printf("[%s] [%d] reduce task Done\n", rq.WorkerId, rq.WorkIdx)
dones := 0
for _, task := range m.meta.reduceTasks {
if task.status == finished {
dones++
}
}
if dones == len(m.meta.reduceTasks) {
m.phase = Done
log.Println("leave ReducePhase")
}
case Done:
log.Println("already Done !!!!!")
default:
log.Fatalf("unknown master phase %d\n", m.phase)
}
return nil
}
|
来看看 worker这边
woker向master请求指派工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (w *WorkerT) requestTask() (exit bool) {
task, err := wk.task()
if err != nil {
log.Fatalf("call task %s\n", err)
}
switch task.TaskType {
case MapWorker:
w.mapWork(task)
case ReduceWorker:
w.reduceWork(task)
case Wait:
time.Sleep(time.Second)
case WorkerExit:
return true
default:
log.Fatalf("unkown task type: %d %#v\n ", task.TaskType, task)
return
}
return false
}
|
worker收到mapworker类型的工作,执行工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
func (w *WorkerT) mapWork(task *TaskReply) {
log.Println("do map")
filename := task.Inputs[0]
if len(task.Inputs) == 0 {
log.Fatalln("no inputs")
}
inputFile, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatalf("read %s err %s\n", task.Inputs[0], err)
}
mapResults := w.mapf(filename, string(inputFile))
outFileNameFunc := func(idxReduce int) string {
return fmt.Sprintf("mr-%d-%d", task.TaskId, idxReduce)
}
var tmps = make(map[int]struct {
*os.File
*json.Encoder
},
)
for i := 0; i < task.Nreduce; i++ {
tempFile, err := ioutil.TempFile(w.currentWD, "*")
if err != nil {
log.Fatalf("create temp file %s\n", err)
}
tmps[i] = struct {
*os.File
*json.Encoder
}{
File: tempFile,
Encoder: json.NewEncoder(tempFile),
}
}
for i, kv := range mapResults {
idxReduce := ihash(mapResults[i].Key) % task.Nreduce
if err := tmps[idxReduce].Encode(KeyValue{
Key: kv.Key,
Value: kv.Value,
}); err != nil {
log.Fatalf("encode %s\n", err)
}
}
for _, tmp := range tmps {
tmp.Close()
}
var intermediates = make([]string, task.Nreduce)
for idxReduce, file := range tmps {
path := outFileNameFunc(idxReduce)
if err := os.Rename(file.Name(), path); err != nil {
log.Fatalf("rename from %s to %s error %s\n",
file.Name(), path, err)
} else {
log.Printf("idxRecuce %d, %s Done\n", idxReduce, path)
}
intermediates[idxReduce] = path
}
log.Printf("worker finished job. MapPhase: %d \n", task.TaskId)
wk.taskDone(task.TaskId, intermediates)
}
|
worker收到reduceworker类型的工作,执行工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (w *WorkerT) reduceWork(task *TaskReply) {
log.Println("do reduce")
results := make(map[string][]string)
for _, input := range task.Inputs {
file, err := os.Open(input)
if err != nil {
log.Fatalf("read %s %s\n", input, err)
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var kv KeyValue
json.Unmarshal([]byte(scanner.Text()), &kv)
results[kv.Key] = append(results[kv.Key], kv.Value)
}
}
tmp, err := ioutil.TempFile(w.currentWD, "*")
defer tmp.Close()
if err != nil {
log.Fatalf("create temp file %s\n", err)
}
var kvs KeyValues
for k, v := range results {
reduceOut := w.reducef(k, v)
kvs = append(kvs, KeyValue{
Key: k,
Value: reduceOut,
})
}
sort.Sort(kvs)
for _, kv := range kvs {
if _, err := fmt.Fprintf(tmp, "%v %v\n", kv.Key, kv.Value); err != nil {
log.Fatalf("write temp %s\n", err)
}
}
path := fmt.Sprintf("mr-out-%d", task.TaskId)
if err := os.Rename(tmp.Name(), path); err != nil {
log.Fatalf("rename from %s to %s %s\n", tmp.Name(), path, err)
}
log.Printf("woker finished job, ReducePhase: %d, output: %s\n", task.TaskId, path)
wk.taskDone(task.TaskId, nil)
}
|
测试
官方提供的test-mr.sh
- 首先是
wc test 和 indexer test , 以 golang plugin 的方式,用不同的func Map(string,string)[]mr.KeyValue 和func Reduce(string, []string) string 构建worker。测试程序功能是否正常
map parallelism test 和 reduce parallelism test :分别测试同一时刻的mapworker和reduceworker是否是并行执行的。
- 最后是
crash test ,crash.go 里面的 maybeCrash() 会使得 worker 随机挂掉。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func maybeCrash() {
max := big.NewInt(1000)
rr, _ := crand.Int(crand.Reader, max)
if rr.Int64() < 330 {
// crash!
os.Exit(1)
} else if rr.Int64() < 660 {
// delay for a while.
maxms := big.NewInt(10 * 1000)
ms, _ := crand.Int(crand.Reader, maxms)
time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
}
}
|
踩到的坑 / 写出的 bug
实验程序几乎是一次性写完的,一气呵成,但是测试过程中出现了两个问题:
mapworker 错误地分发了中间文件
当所有的workers 都完成master 指派的任务之后,测试用例发现最终的结果文件含有重复的key,我感到很疑惑,我不是已经在 reduce worker 的处理过程中使用 map 对重复的 key-value 对做了group 吗?
随后我察觉到,是我在reduce worker 完成工作后,向master 通报taskDone 的 rpc 过程里的 []files 长度不是 nreduce, 这样就导致 master 将mapworker产生的中间文件错误地分发给了不同的reducetasks:
由于疏忽,导致 master 没能剔除 offline worker
./mr_test.sh 除了了最后 的 maybe crash test 没过,其它的测试都PASSED。
我察觉到有的worker crash掉之后,master并不会剔除那些挂掉的worker, 我立即断定,肯定是 updateworkers 出了问题,仔细一看,因为我没有在 assignTask 时同时更新 tasks 的 workerId 字段, 导致我即使察觉到有的worker挂掉之后,task.bindworker 永远不等于 worker.id, 因此所有offline worker永远不会剔除,程序会一直运行,直到超时。
总结
这个 demo 有个缺点,因为实验要求只有有一个 master 就行,所以只处理了 worker crash 的问题,没考虑 master crash.
但是论文里要求 user program 得 fork() 出 多个进程, 当一个主 master 挂掉之后,其他的从 master 接着上,那么就得对 master 的元信息:meta 定时做序列化并持久化,和新选举的新master对元信息反序列化。