任务梳理
mapreduce 介绍及部署
lab1 简介
实现一个 mapreduce
的 demo,也就是在单机上部署的分布式词频统计系统。至于和实际生产场景上最大的区别就在于文件系统。在单机上部署共用一套文件系统,而实际生产端要复杂很多。
实际部署
完整的部署流程以及实验细节详见 6.824 Lab 1: MapReduce
串行化的部署流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15$ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
$ cd 6.824
$ ls
Makefile src
$
$ cd ~/6.824
$ cd src/main
$ go build -race -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run -race mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...mapreduce 部署流程
(1). 使用 go plugin 将map
和reduce
函数插件化
恶心的 go plugin,是个不成熟的方案,差点让我倒在了第一步,具体的踩坑情况见MIT6.824 Lab 踩坑记录1
$ go build -race -buildmode=plugin ../mrapps/wc.go
(2). 启动 coordinator
进程
1 | $ rm mr-out* |
(3). 开启多个终端窗口,启动多个 worker
进程
1 | $ go run -race mrworker.go wc.so |
- 最终测试结果展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting job count test.
--- job count test: PASS
*** Starting early exit test.
--- early exit test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
系统设计
整个系统中 worker
和 coordinator
的交互状态包含下图所示的一系列情况:
RPC 通信
- 建立连接,给每个
worker
分配专属工号1
2
3
4
5
6
7
8// Ask for WorkerNum and Contruct Connection
type HelloArgs struct {
X string
}
type HelloReply struct {
Y int
}
HelloArgs
的 X
元素是双方约定好的字符串,coordinator
只有在收到约定的信息后才能建立连接,分配工号。
- map 任务的请求与完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// Map tasks Arguments and Reply
type MapArgs struct {
WorkerNum int
}
type MapReply struct {
NReducer int
Finished bool
FileName string
JobId int
}
// Finish Map Task
type FinishMapArgs struct {
X bool
WorkerNum int
FileName []string
JobId int
}
type FinishMapReply struct {
Y int
}
请求
map
任务worker
请求map
任务时带上自己的工号,如果被成功分配了map
任务,那么将会得到reducer
的数量、是否完成全部map
任务的信号、要执行的文件名称、被分配的事务Id
(用于命名中间临时文件,避免crash
)。发送结束
map
任务的信号worker
在自己的进程中执行完(或未能成功执行),将生成的临时文件名(地址)转告coordinator
。
- reduce 任务的请求与完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// Reduce Task
type ReduceTaskArgs struct {
WorkerNum int
}
type ReduceTaskReply struct {
Finished bool
ReducerFile []string
JobId int
}
// Finish Reduce Task
type FinishReduceArgs struct {
X bool
WorkerNum int
File string
}
type FinishReduceReply struct {
Y int
}
大体思路和 map
任务的请求和结束一直,不过多赘述。
- map 和 reduce 任务都完成,请求关机
1
2
3
4
5
6
7
8// Ask for shutdown
type ExitArgs struct {
X bool
}
type ExitReply struct {
Y bool
}
coordinator 进程
1 | type Coordinator struct { |
主要负责的流程如下:
- 接收
worker
的建立连接请求并发放工号; - 分配
map
任务:
- 建立待分配、待完成文件队列
- 给成功“接单”(成功从待分配队列中获取文件)的
worker
分配唯一的jobId
,并反馈是否已完成所有的map
任务 - 开启计时 goroutine,一旦对应的
map
任务超时,则将对应文件名从待完成队列放入待分配队列
- 分配
reduce
任务:
- 在完成所有
map
任务的前提下才会分配reduce
任务 - 其余流程与
map
任务类似
- shutdown:完成所有
reduce
任务后,worker
会收到结束信号,然后通过 RPC 请求关闭,当所有worker
都退出后,coordinator
才会关闭。
worker 进程
- 建立与
coordinator
的连接,获取工号; - 请求
map
任务,获得任务后开始执行,响应完成情况后继续请求map
任务直到再次获取或者所有map
任务都结束; - 所有
map
任务都结束后,请求reduce
任务,获得任务后开始执行,响应完成情况后继续请求reduce
任务直到再次获取reduce
任务或者所有reduce
任务结束; - 请求结束连接,关机。
一些细节
生成的
nReducer
个中间临时文件可以编码成json
的格式,方便处理,可以在打开的文件中写入:1
2
3enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)在打开的文件中读取:
1
2
3
4
5
6
7
8dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}worker
每次在结束任务后都要重复请求任务(心跳),直到获得任务全部结束的信号。通过维护
待分配
和待完成
两个队列,可以控制任务的发放与map
、reduce
两个大任务的完成情况。可以通过生成临时文件保存临时结果的方式避免因为超时导致的
crash
。保存的文件可以带上唯一的
jobId
防止crash
。coordinator
对任务的超时控制:采用了一个 select 的结构,第一个分支是超时上下文ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
来控制,如果超时,除了处理对应的两个队列外,还需要将对应工号的rejectResult
置为true
,防止worker
在规定时间内完成了任务,但是因为 PRC 通信的延迟导致超时出现的coordinator
和worker
对临时文件处理不一致的情况;第二个分支是对应工号的workerDone
,coordinator
在规定时间内接收到正确的完成信号后,就会向该无缓冲通道内发送数据,从而选择这一分支,表示任务顺利完成。对锁和原子数据的使用上要注意,避免
data race
(编译时通过-race
进行竞态检测)。
项目介绍
项目地址
关键项目文件结构
1 | |- doc |
- Post title:MIT6.824 Lab1
- Post author:洪笳淏
- Create time:2022-04-06 12:36:00
- Post link:https://jiahaohong1997.github.io/2022/04/06/MIT6.824 Lab1/
- Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.