This page looks best with JavaScript enabled

MIT-6.824 分布式系统 课后实验 Lab1 (Map Reduce)

 ·  ☕ 6 min read

Map-Reduce 论文阅读

分析

实验要求 在 mr/master.go, mr/worker.go, and mr/rpc.go. 中编码,并且只指定了 reduce worker的数量:nreduce

master最后要以 go run mrmaster.go pg-*.txt 的方式运行,每一个.txt文件对应一个split,一个split分派一个mapworker

workergo run mrworker.go wc.so这样的方式运行,因为有多个worker并行运行,所以master要考虑线程安全。

考虑到worker在写入文件的过程中挂掉,遗留未完成的输出文件, 要用 ioutil.TempFile 创建临时文件,写完后, os.Rename 为目标文件。

中间文件用 mr-X-Y的格式命名, 结果文件以 mr-out-X的方式命名

思路

master

来看看master要注意的:

  • keepalive, workers要定时ping mastermaster 得剔除长时间不在线的worker
  • 分发maptaskmapworker
  • mapworker 完成 maptask之后,记录每个mapworker的输出结果。
  • 整合mapworker完成后的工作,分派任务给 reduceworker
  • 等待所有reduceworker完成工作。

worker

再来看看worker要注意什么:

  • 定时向master ping,
  • 当空闲时候, 向master询问有没有工作要做? 让我做mapworker?还是reduceworker? 要是没活要干的话,那我wait,等会儿再来问一遍
  • proc assigned task
  • notify master when task done

实现

master 一启动就进入mapPhase 向新来的worker分派maptask,当所有maptask 做完后,进入 ReducePhase,所有reducetask做完后,进入Done 状态,然后退出

1
2
3
4
5
6
type ProgressStatus int
const (
	MapPhase ProgressStatus = iota + 1
	ReducePhase
    Done
)

master还得记录任务的状态:

  • 还没指派worker
  • 有worker 在做了
  • 做完了
1
2
3
4
5
6
type taskStatus int
const (
	notAssign taskStatus = iota + 1
	doing
	finished
)

对于每一个任务,得记录这个人物安排给了那个woker,这个任务的状态,输入文件路径,输出结果路径

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type mapTask struct {
	bindWorker string
	ins        []string
	outs       map[int]string
	status     taskStatus
}

type reduceTask struct {
	bindWorker string
	ins        []string
	out        string
	status     taskStatus
}

master的元信息要考虑workers的状态: wkerStatus 里的lastOnline

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type meta struct {
	nReducer    int
	mapTasks    []mapTask
	reduceTasks []reduceTask
	workers     map[string]wkerStatus
}

type wkerStatus struct {
	id         string
	lastOnline time.Time
}
1
2
3
4
5
6
type Master struct {
	sync.Mutex 

	meta  meta 
	phase ProgressStatus 
}

master定时剔除 offline workers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *Master) updateWorkers() {
	m.Lock()
	defer m.Unlock()

	for _, worker := range m.meta.workers {
		if time.Now().Sub(worker.lastOnline).Seconds() <= 10 {
			continue
		}
		switch m.phase {
		case MapPhase:
			for i, task := range m.meta.mapTasks {
				if task.bindWorker == worker.id && task.status == doing {
					m.meta.mapTasks[i].status = notAssign
					delete(m.meta.workers, worker.id)
					log.Printf("REMOVE worker %s from map tasks", worker.id)
				}
			}

		case ReducePhase:
			for i, task := range m.meta.reduceTasks {
				if task.bindWorker == worker.id && task.status == doing {
					m.meta.reduceTasks[i].status = notAssign
					delete(m.meta.workers, worker.id)
					log.Printf("REMOVE worker %s from reduce tasks", worker.id)
				}
			}
		}
	}
}

worker来发起taskRequest时, master的响应

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (m *Master) TaskRequest(rq *PingArgs, rp *TaskReply) error {
	log.Printf("got task request from %s %s\n", rq.WorkerId, rq.Time.Format(time.RFC3339))

	m.Lock()
	defer m.Unlock()

	switch m.phase {
	case MapPhase:
		for i, task := range m.meta.mapTasks {
			if task.status == notAssign {
				m.meta.mapTasks[i].status = doing
				m.meta.mapTasks[i].bindWorker = rq.WorkerId
				*rp = TaskReply{
					TaskType: MapWorker,
					Inputs:   task.ins,
					Nreduce:  m.meta.nReducer,
					TaskId:   i,
				}
				log.Printf("assign map task [%d]to %s , rp %#v \n ",
					i, rq.WorkerId, rp)
				return nil
			} else if task.status == doing {
				log.Printf("map task %s is doing\n", task.bindWorker)
			}
		}

	case ReducePhase:

		for i, task := range m.meta.reduceTasks {
			if task.status == notAssign {
				m.meta.reduceTasks[i].status = doing
				m.meta.reduceTasks[i].bindWorker = rq.WorkerId
				*rp = TaskReply{
					TaskType: ReduceWorker,
					Nreduce:  m.meta.nReducer,
					Inputs:   task.ins,
					TaskId:   i,
				}
				log.Printf("assign reduce task [%d]to %s \n ",
					i, rq.WorkerId)
				return nil
			} else if task.status == doing {
				log.Printf("reduce task %s is doing\n", task.bindWorker)
			}
		}
	case Done:
		log.Println("works already done")
		rp.TaskType = WorkerExit
	default:
		log.Fatalf("unknown master phase %d\n", m.phase)
	}
	log.Println("nothing to do wait")
	rp.TaskType = Wait
	return nil
}

worker做完工作,通知master时, master的响应

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (m *Master) TaskDone(rq *DoneArgs, rp *DoneReply) error {
	m.Lock()
	defer m.Unlock()

	switch m.phase {
	case MapPhase:
		m.meta.mapTasks[rq.WorkIdx].status = finished
		log.Printf("[%s] [%d] map task Done\n", rq.WorkerId, rq.WorkIdx)
		dones := 0
		for _, task := range m.meta.mapTasks {
			if task.status == finished {
				dones++
			}
		}
		if len(rq.Files) != m.meta.nReducer {
			log.Fatalf("request files count %d != %d\n",
				len(rq.Files), m.meta.nReducer)
		}

		for idxReduce, file := range rq.Files {
			m.meta.reduceTasks[idxReduce].ins =
				append(m.meta.reduceTasks[idxReduce].ins, file)
		}

		if dones == len(m.meta.mapTasks) {
			log.Println("go from mapPhae to ReducePhase")
			m.phase = ReducePhase
		} else {
			log.Printf("%d task doing\n", len(m.meta.mapTasks)-dones)
		}
	case ReducePhase:
		m.meta.reduceTasks[rq.WorkIdx].status = finished
		log.Printf("[%s] [%d] reduce task Done\n", rq.WorkerId, rq.WorkIdx)
		dones := 0
		for _, task := range m.meta.reduceTasks {
			if task.status == finished {
				dones++
			}
		}
		if dones == len(m.meta.reduceTasks) {
			m.phase = Done
			log.Println("leave ReducePhase")
		}

	case Done:
		log.Println("already Done !!!!!")
	default:
		log.Fatalf("unknown master phase %d\n", m.phase)

	}
	return nil
}

来看看 worker这边

wokermaster请求指派工作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (w *WorkerT) requestTask() (exit bool) {
	task, err := wk.task()
	if err != nil {
		log.Fatalf("call task %s\n", err)
	}

	switch task.TaskType {
	case MapWorker:
		w.mapWork(task)
	case ReduceWorker:
		w.reduceWork(task)
	case Wait:
		time.Sleep(time.Second)
	case WorkerExit:
		return true
	default:
		log.Fatalf("unkown task type: %d %#v\n ", task.TaskType, task)
		return
	}
	return false
}

worker收到mapworker类型的工作,执行工作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (w *WorkerT) mapWork(task *TaskReply) {
	log.Println("do map")

	filename := task.Inputs[0]
	if len(task.Inputs) == 0 {
		log.Fatalln("no inputs")
	}

	inputFile, err := ioutil.ReadFile(filename)
	if err != nil {
		log.Fatalf("read %s err %s\n", task.Inputs[0], err)
	}

	mapResults := w.mapf(filename, string(inputFile))
	outFileNameFunc := func(idxReduce int) string {
		return fmt.Sprintf("mr-%d-%d", task.TaskId, idxReduce)
	}
	var tmps = make(map[int]struct {
		*os.File
		*json.Encoder
	},
	)
	for i := 0; i < task.Nreduce; i++ {
		tempFile, err := ioutil.TempFile(w.currentWD, "*")
		if err != nil {
			log.Fatalf("create temp file %s\n", err)
		}
		tmps[i] = struct {
			*os.File
			*json.Encoder
		}{
			File:    tempFile,
			Encoder: json.NewEncoder(tempFile),
		}
	}

	for i, kv := range mapResults {
		idxReduce := ihash(mapResults[i].Key) % task.Nreduce
		if err := tmps[idxReduce].Encode(KeyValue{
			Key:   kv.Key,
			Value: kv.Value,
		}); err != nil {
			log.Fatalf("encode %s\n", err)
		}
	}
	for _, tmp := range tmps {
		tmp.Close()
	}

	var intermediates = make([]string, task.Nreduce)

	for idxReduce, file := range tmps {
		path := outFileNameFunc(idxReduce)
		if err := os.Rename(file.Name(), path); err != nil {
			log.Fatalf("rename from %s to %s error %s\n",
				file.Name(), path, err)
		} else {
			log.Printf("idxRecuce %d, %s Done\n", idxReduce, path)
		}
		intermediates[idxReduce] = path
	}
	log.Printf("worker finished job. MapPhase: %d \n", task.TaskId)
	wk.taskDone(task.TaskId, intermediates)

}

worker收到reduceworker类型的工作,执行工作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (w *WorkerT) reduceWork(task *TaskReply) {
	log.Println("do reduce")

	results := make(map[string][]string)
	for _, input := range task.Inputs {
		file, err := os.Open(input)
		if err != nil {
			log.Fatalf("read %s %s\n", input, err)
		}
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			var kv KeyValue
			json.Unmarshal([]byte(scanner.Text()), &kv)

			results[kv.Key] = append(results[kv.Key], kv.Value)
		}
	}
	tmp, err := ioutil.TempFile(w.currentWD, "*")
	defer tmp.Close()
	if err != nil {
		log.Fatalf("create temp file %s\n", err)
	}
	var kvs KeyValues
	for k, v := range results {
		reduceOut := w.reducef(k, v)
		kvs = append(kvs, KeyValue{
			Key:   k,
			Value: reduceOut,
		})
	}
	sort.Sort(kvs)
	for _, kv := range kvs {
		if _, err := fmt.Fprintf(tmp, "%v %v\n", kv.Key, kv.Value); err != nil {
			log.Fatalf("write temp %s\n", err)
		}
	}

	path := fmt.Sprintf("mr-out-%d", task.TaskId)
	if err := os.Rename(tmp.Name(), path); err != nil {
		log.Fatalf("rename from %s to %s  %s\n", tmp.Name(), path, err)
	}
	log.Printf("woker finished job, ReducePhase: %d, output: %s\n", task.TaskId, path)
	wk.taskDone(task.TaskId, nil)
}

测试

官方提供的test-mr.sh

  1. 首先是 wc testindexer test , 以 golang plugin 的方式,用不同的func Map(string,string)[]mr.KeyValue func Reduce(string, []string) string 构建worker。测试程序功能是否正常
  2. map parallelism testreduce parallelism test :分别测试同一时刻的mapworkerreduceworker是否是并行执行的。
  3. 最后是 crash testcrash.go 里面的 maybeCrash() 会使得 worker 随机挂掉。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func maybeCrash() {
	max := big.NewInt(1000)
	rr, _ := crand.Int(crand.Reader, max)
	if rr.Int64() < 330 {
		// crash!
		os.Exit(1)
	} else if rr.Int64() < 660 {
		// delay for a while.
		maxms := big.NewInt(10 * 1000)
		ms, _ := crand.Int(crand.Reader, maxms)
		time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
	}
}

踩到的坑 / 写出的bug

实验程序几乎是一次性写完的,一气呵成,但是测试过程中出现了两个问题:

mapworker 错误地分发了中间文件

当所有的workers 都完成master 指派的任务之后,测试用例发现最终的结果文件含有重复的key,我感到很疑惑,我不是已经在 reduce worker 的处理过程中使用 map 对重复的 key-value 对做了group 吗?
随后我察觉到,是我在reduce worker 完成工作后,向master 通报taskDonerpc 过程里的 []files 长度不是 nreduce, 这样就导致 mastermapworker产生的中间文件错误地分发给了不同的reducetasks

由于疏忽,导致master 没能剔除 offline worker

./mr_test.sh 除了了最后 的 maybe crash test 没过,其它的测试都PASSED

我察觉到有的worker crash掉之后,master并不会剔除那些挂掉的worker, 我立即断定,肯定是 updateworkers 出了问题,仔细一看,因为我没有在 assignTask 时同时更新 tasksworkerId 字段, 导致我即使察觉到有的worker挂掉之后,task.bindworker 永远不等于 worker.id, 因此所有offline worker永远不会剔除,程序会一直运行,直到超时。

总结

这个 demo 有个缺点,因为实验要求只有有一个 master 就行,所以只处理了 worker crash 的问题,没考虑 master crash.

但是论文里要求 user programfork() 出 多个进程, 当一个主 master 挂掉之后,其他的从 master 接着上,那么就得对 master 的元信息:meta 定时做序列化并持久化,和新选举的新master对元信息反序列化。

Share on

EXEC
WRITTEN BY
EXEC
Evil EXEC