Map-Reduce 论文阅读
分析
实验要求 在 mr/master.go, mr/worker.go, and mr/rpc.go.
中编码,并且只指定了 reduce worker
的数量:nreduce
。
master
最后要以 go run mrmaster.go pg-*.txt
的方式运行,每一个.txt
文件对应一个split
,一个split
分派一个mapworker
worker
以go run mrworker.go wc.so
这样的方式运行,因为有多个worker
并行运行,所以master
要考虑线程安全。
考虑到worker
在写入文件的过程中挂掉,遗留未完成的输出文件, 要用 ioutil.TempFile
创建临时文件,写完后, os.Rename
为目标文件。
中间文件用 mr-X-Y
的格式命名, 结果文件以 mr-out-X
的方式命名
思路
master
来看看master
要注意的:
- keepalive,
workers
要定时ping master
, master
得剔除长时间不在线的worker
- 分发
maptask
给mapworker
,
mapworker
完成 maptask
之后,记录每个mapworker
的输出结果。
- 整合
mapworker
完成后的工作,分派任务给 reduceworker
- 等待所有
reduceworker
完成工作。
worker
再来看看worker
要注意什么:
- 定时向
master
ping,
- 当空闲时候, 向
master
询问有没有工作要做? 让我做mapworker
?还是reduceworker
? 要是没活要干的话,那我wait
,等会儿再来问一遍
- proc assigned task
- notify master when task done
实现
master
一启动就进入mapPhase
向新来的worker
分派maptask
,当所有maptask
做完后,进入 ReducePhase
,所有reducetask
做完后,进入Done
状态,然后退出
1
2
3
4
5
6
|
type ProgressStatus int
const (
MapPhase ProgressStatus = iota + 1
ReducePhase
Done
)
|
master
还得记录任务的状态:
- 还没指派 worker
- 有 worker 在做了
- 做完了
1
2
3
4
5
6
|
type taskStatus int
const (
notAssign taskStatus = iota + 1
doing
finished
)
|
对于每一个任务,得记录这个人物安排给了那个 woker,这个任务的状态,输入文件路径,输出结果路径
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type mapTask struct {
bindWorker string
ins []string
outs map[int]string
status taskStatus
}
type reduceTask struct {
bindWorker string
ins []string
out string
status taskStatus
}
|
master
的元信息要考虑workers
的状态: wkerStatus
里的lastOnline
。
1
2
3
4
5
6
7
8
9
10
11
|
type meta struct {
nReducer int
mapTasks []mapTask
reduceTasks []reduceTask
workers map[string]wkerStatus
}
type wkerStatus struct {
id string
lastOnline time.Time
}
|
1
2
3
4
5
6
|
type Master struct {
sync.Mutex
meta meta
phase ProgressStatus
}
|
master
定时剔除 offline workers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func (m *Master) updateWorkers() {
m.Lock()
defer m.Unlock()
for _, worker := range m.meta.workers {
if time.Now().Sub(worker.lastOnline).Seconds() <= 10 {
continue
}
switch m.phase {
case MapPhase:
for i, task := range m.meta.mapTasks {
if task.bindWorker == worker.id && task.status == doing {
m.meta.mapTasks[i].status = notAssign
delete(m.meta.workers, worker.id)
log.Printf("REMOVE worker %s from map tasks", worker.id)
}
}
case ReducePhase:
for i, task := range m.meta.reduceTasks {
if task.bindWorker == worker.id && task.status == doing {
m.meta.reduceTasks[i].status = notAssign
delete(m.meta.workers, worker.id)
log.Printf("REMOVE worker %s from reduce tasks", worker.id)
}
}
}
}
}
|
当worker
来发起taskRequest
时, master
的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
func (m *Master) TaskRequest(rq *PingArgs, rp *TaskReply) error {
log.Printf("got task request from %s %s\n", rq.WorkerId, rq.Time.Format(time.RFC3339))
m.Lock()
defer m.Unlock()
switch m.phase {
case MapPhase:
for i, task := range m.meta.mapTasks {
if task.status == notAssign {
m.meta.mapTasks[i].status = doing
m.meta.mapTasks[i].bindWorker = rq.WorkerId
*rp = TaskReply{
TaskType: MapWorker,
Inputs: task.ins,
Nreduce: m.meta.nReducer,
TaskId: i,
}
log.Printf("assign map task [%d]to %s , rp %#v \n ",
i, rq.WorkerId, rp)
return nil
} else if task.status == doing {
log.Printf("map task %s is doing\n", task.bindWorker)
}
}
case ReducePhase:
for i, task := range m.meta.reduceTasks {
if task.status == notAssign {
m.meta.reduceTasks[i].status = doing
m.meta.reduceTasks[i].bindWorker = rq.WorkerId
*rp = TaskReply{
TaskType: ReduceWorker,
Nreduce: m.meta.nReducer,
Inputs: task.ins,
TaskId: i,
}
log.Printf("assign reduce task [%d]to %s \n ",
i, rq.WorkerId)
return nil
} else if task.status == doing {
log.Printf("reduce task %s is doing\n", task.bindWorker)
}
}
case Done:
log.Println("works already done")
rp.TaskType = WorkerExit
default:
log.Fatalf("unknown master phase %d\n", m.phase)
}
log.Println("nothing to do wait")
rp.TaskType = Wait
return nil
}
|
当worker
做完工作,通知 master 时, master
的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
func (m *Master) TaskDone(rq *DoneArgs, rp *DoneReply) error {
m.Lock()
defer m.Unlock()
switch m.phase {
case MapPhase:
m.meta.mapTasks[rq.WorkIdx].status = finished
log.Printf("[%s] [%d] map task Done\n", rq.WorkerId, rq.WorkIdx)
dones := 0
for _, task := range m.meta.mapTasks {
if task.status == finished {
dones++
}
}
if len(rq.Files) != m.meta.nReducer {
log.Fatalf("request files count %d != %d\n",
len(rq.Files), m.meta.nReducer)
}
for idxReduce, file := range rq.Files {
m.meta.reduceTasks[idxReduce].ins =
append(m.meta.reduceTasks[idxReduce].ins, file)
}
if dones == len(m.meta.mapTasks) {
log.Println("go from mapPhae to ReducePhase")
m.phase = ReducePhase
} else {
log.Printf("%d task doing\n", len(m.meta.mapTasks)-dones)
}
case ReducePhase:
m.meta.reduceTasks[rq.WorkIdx].status = finished
log.Printf("[%s] [%d] reduce task Done\n", rq.WorkerId, rq.WorkIdx)
dones := 0
for _, task := range m.meta.reduceTasks {
if task.status == finished {
dones++
}
}
if dones == len(m.meta.reduceTasks) {
m.phase = Done
log.Println("leave ReducePhase")
}
case Done:
log.Println("already Done !!!!!")
default:
log.Fatalf("unknown master phase %d\n", m.phase)
}
return nil
}
|
来看看 worker
这边
woker
向master
请求指派工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (w *WorkerT) requestTask() (exit bool) {
task, err := wk.task()
if err != nil {
log.Fatalf("call task %s\n", err)
}
switch task.TaskType {
case MapWorker:
w.mapWork(task)
case ReduceWorker:
w.reduceWork(task)
case Wait:
time.Sleep(time.Second)
case WorkerExit:
return true
default:
log.Fatalf("unkown task type: %d %#v\n ", task.TaskType, task)
return
}
return false
}
|
worker
收到mapworker
类型的工作,执行工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
func (w *WorkerT) mapWork(task *TaskReply) {
log.Println("do map")
filename := task.Inputs[0]
if len(task.Inputs) == 0 {
log.Fatalln("no inputs")
}
inputFile, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatalf("read %s err %s\n", task.Inputs[0], err)
}
mapResults := w.mapf(filename, string(inputFile))
outFileNameFunc := func(idxReduce int) string {
return fmt.Sprintf("mr-%d-%d", task.TaskId, idxReduce)
}
var tmps = make(map[int]struct {
*os.File
*json.Encoder
},
)
for i := 0; i < task.Nreduce; i++ {
tempFile, err := ioutil.TempFile(w.currentWD, "*")
if err != nil {
log.Fatalf("create temp file %s\n", err)
}
tmps[i] = struct {
*os.File
*json.Encoder
}{
File: tempFile,
Encoder: json.NewEncoder(tempFile),
}
}
for i, kv := range mapResults {
idxReduce := ihash(mapResults[i].Key) % task.Nreduce
if err := tmps[idxReduce].Encode(KeyValue{
Key: kv.Key,
Value: kv.Value,
}); err != nil {
log.Fatalf("encode %s\n", err)
}
}
for _, tmp := range tmps {
tmp.Close()
}
var intermediates = make([]string, task.Nreduce)
for idxReduce, file := range tmps {
path := outFileNameFunc(idxReduce)
if err := os.Rename(file.Name(), path); err != nil {
log.Fatalf("rename from %s to %s error %s\n",
file.Name(), path, err)
} else {
log.Printf("idxRecuce %d, %s Done\n", idxReduce, path)
}
intermediates[idxReduce] = path
}
log.Printf("worker finished job. MapPhase: %d \n", task.TaskId)
wk.taskDone(task.TaskId, intermediates)
}
|
worker
收到reduceworker
类型的工作,执行工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (w *WorkerT) reduceWork(task *TaskReply) {
log.Println("do reduce")
results := make(map[string][]string)
for _, input := range task.Inputs {
file, err := os.Open(input)
if err != nil {
log.Fatalf("read %s %s\n", input, err)
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var kv KeyValue
json.Unmarshal([]byte(scanner.Text()), &kv)
results[kv.Key] = append(results[kv.Key], kv.Value)
}
}
tmp, err := ioutil.TempFile(w.currentWD, "*")
defer tmp.Close()
if err != nil {
log.Fatalf("create temp file %s\n", err)
}
var kvs KeyValues
for k, v := range results {
reduceOut := w.reducef(k, v)
kvs = append(kvs, KeyValue{
Key: k,
Value: reduceOut,
})
}
sort.Sort(kvs)
for _, kv := range kvs {
if _, err := fmt.Fprintf(tmp, "%v %v\n", kv.Key, kv.Value); err != nil {
log.Fatalf("write temp %s\n", err)
}
}
path := fmt.Sprintf("mr-out-%d", task.TaskId)
if err := os.Rename(tmp.Name(), path); err != nil {
log.Fatalf("rename from %s to %s %s\n", tmp.Name(), path, err)
}
log.Printf("woker finished job, ReducePhase: %d, output: %s\n", task.TaskId, path)
wk.taskDone(task.TaskId, nil)
}
|
测试
官方提供的test-mr.sh
- 首先是
wc test
和 indexer test
, 以 golang plugin
的方式,用不同的func Map(string,string)[]mr.KeyValue
和func Reduce(string, []string) string
构建worker
。测试程序功能是否正常
map parallelism test
和 reduce parallelism test
:分别测试同一时刻的mapworker
和reduceworker
是否是并行执行的。
- 最后是
crash test
,crash.go
里面的 maybeCrash()
会使得 worker
随机挂掉。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func maybeCrash() {
max := big.NewInt(1000)
rr, _ := crand.Int(crand.Reader, max)
if rr.Int64() < 330 {
// crash!
os.Exit(1)
} else if rr.Int64() < 660 {
// delay for a while.
maxms := big.NewInt(10 * 1000)
ms, _ := crand.Int(crand.Reader, maxms)
time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
}
}
|
踩到的坑 / 写出的 bug
实验程序几乎是一次性写完的,一气呵成,但是测试过程中出现了两个问题:
mapworker 错误地分发了中间文件
当所有的workers
都完成master
指派的任务之后,测试用例发现最终的结果文件含有重复的key
,我感到很疑惑,我不是已经在 reduce worker
的处理过程中使用 map
对重复的 key-value
对做了group
吗?
随后我察觉到,是我在reduce worker
完成工作后,向master
通报taskDone
的 rpc
过程里的 []files
长度不是 nreduce
, 这样就导致 master
将mapworker
产生的中间文件错误地分发给了不同的reducetasks
:
由于疏忽,导致 master 没能剔除 offline worker
./mr_test.sh
除了了最后 的 maybe crash test
没过,其它的测试都PASSED
。
我察觉到有的worker crash
掉之后,master
并不会剔除那些挂掉的worker
, 我立即断定,肯定是 updateworkers
出了问题,仔细一看,因为我没有在 assignTask
时同时更新 tasks
的 workerId
字段, 导致我即使察觉到有的worker
挂掉之后,task.bindworker
永远不等于 worker.id
, 因此所有offline worker
永远不会剔除,程序会一直运行,直到超时。
总结
这个 demo
有个缺点,因为实验要求只有有一个 master
就行,所以只处理了 worker crash
的问题,没考虑 master crash
.
但是论文里要求 user program
得 fork()
出 多个进程, 当一个主 master
挂掉之后,其他的从 master
接着上,那么就得对 master
的元信息:meta
定时做序列化并持久化,和新选举的新master
对元信息反序列化。