MIT6.824 Lab1
洪笳淏 Lv4

任务梳理

mapreduce 介绍及部署

avatar

lab1 简介

  实现一个 mapreduce 的 demo,也就是在单机上部署的分布式词频统计系统。至于和实际生产场景上最大的区别就在于文件系统。在单机上部署共用一套文件系统,而实际生产端要复杂很多。

实际部署

  完整的部署流程以及实验细节详见 6.824 Lab 1: MapReduce

  1. 串行化的部署流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    $ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
    $ cd 6.824
    $ ls
    Makefile src
    $
    $ cd ~/6.824
    $ cd src/main
    $ go build -race -buildmode=plugin ../mrapps/wc.go
    $ rm mr-out*
    $ go run -race mrsequential.go wc.so pg*.txt
    $ more mr-out-0
    A 509
    ABOUT 2
    ACT 8
    ...
  2. mapreduce 部署流程
    (1). 使用 go plugin 将 mapreduce 函数插件化
    恶心的 go plugin,是个不成熟的方案,差点让我倒在了第一步,具体的踩坑情况见MIT6.824 Lab 踩坑记录

    1
    $ go build -race -buildmode=plugin ../mrapps/wc.go

(2). 启动 coordinator 进程

1
2
$ rm mr-out*
$ go run -race mrcoordinator.go pg-*.txt

(3). 开启多个终端窗口,启动多个 worker 进程

1
$ go run -race mrworker.go wc.so
  1. 最终测试结果展示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    $ bash test-mr.sh       
    *** Starting wc test.
    --- wc test: PASS
    *** Starting indexer test.
    --- indexer test: PASS
    *** Starting map parallelism test.
    --- map parallelism test: PASS
    *** Starting reduce parallelism test.
    --- reduce parallelism test: PASS
    *** Starting job count test.
    --- job count test: PASS
    *** Starting early exit test.
    --- early exit test: PASS
    *** Starting crash test.
    --- crash test: PASS
    *** PASSED ALL TESTS

系统设计

  整个系统中 workercoordinator 的交互状态包含下图所示的一系列情况:
avatar

RPC 通信

  1. 建立连接,给每个 worker 分配专属工号
    1
    2
    3
    4
    5
    6
    7
    8
    // Ask for WorkerNum and Contruct Connection  
    type HelloArgs struct {
    X string
    }

    type HelloReply struct {
    Y int
    }

  HelloArgsX 元素是双方约定好的字符串,coordinator 只有在收到约定的信息后才能建立连接,分配工号。

  1. map 任务的请求与完成
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // Map tasks Arguments and Reply  
    type MapArgs struct {
    WorkerNum int
    }

    type MapReply struct {
    NReducer int
    Finished bool
    FileName string
    JobId int
    }

    // Finish Map Task
    type FinishMapArgs struct {
    X bool
    WorkerNum int
    FileName []string
    JobId int
    }

    type FinishMapReply struct {
    Y int
    }
  • 请求 map 任务
    worker 请求 map 任务时带上自己的工号,如果被成功分配了 map 任务,那么将会得到 reducer 的数量、是否完成全部 map 任务的信号、要执行的文件名称、被分配的事务 Id(用于命名中间临时文件,避免 crash)。

  • 发送结束 map 任务的信号
    worker 在自己的进程中执行完(或未能成功执行),将生成的临时文件名(地址)转告 coordinator

  1. reduce 任务的请求与完成
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // Reduce Task  
    type ReduceTaskArgs struct {
    WorkerNum int
    }

    type ReduceTaskReply struct {
    Finished bool
    ReducerFile []string
    JobId int
    }

    // Finish Reduce Task
    type FinishReduceArgs struct {
    X bool
    WorkerNum int
    File string
    }

    type FinishReduceReply struct {
    Y int
    }

  大体思路和 map 任务的请求和结束一直,不过多赘述。

  1. map 和 reduce 任务都完成,请求关机
    1
    2
    3
    4
    5
    6
    7
    8
    // Ask for shutdown  
    type ExitArgs struct {
    X bool
    }

    type ExitReply struct {
    Y bool
    }

coordinator 进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Coordinator struct {  
// Your definitions here.
fileName []string // 初始文件名列表,待分配队列
unfinishedFile []string // map任务进行中的文件名列表,待完成队列
nReducer int
lock sync.Mutex
jobNum int // 事务 id,递增不重复

workerNum int // worker 编号,递增不重复
workerDone []chan int // 对应 worker 的任务执行情况
rejectResult []bool // worker 成功完成任务,但因为通信到达后超时,拒绝该任务

mapFinished bool // map 完成才能开启 reduce
reduceFiles [][]string // [nReducer][]string,待分配队列
unFinishedReduceFile [][]string // reduce 正在进行中的文件列表,待完成队列
finalFiles []string // [nReducer]string,最终文件

exitValue atomic.Value // 原子量,保证 coordinator 正常关机
}

主要负责的流程如下:

  1. 接收 worker 的建立连接请求并发放工号;
  2. 分配 map 任务:
  • 建立待分配、待完成文件队列
  • 给成功“接单”(成功从待分配队列中获取文件)的 worker 分配唯一的 jobId,并反馈是否已完成所有的 map 任务
  • 开启计时 goroutine,一旦对应的 map 任务超时,则将对应文件名从待完成队列放入待分配队列
  1. 分配 reduce 任务:
  • 在完成所有 map 任务的前提下才会分配 reduce 任务
  • 其余流程与 map 任务类似
  1. shutdown:完成所有 reduce 任务后,worker 会收到结束信号,然后通过 RPC 请求关闭,当所有 worker 都退出后,coordinator 才会关闭。

worker 进程

  1. 建立与 coordinator 的连接,获取工号;
  2. 请求 map 任务,获得任务后开始执行,响应完成情况后继续请求 map 任务直到再次获取或者所有 map 任务都结束;
  3. 所有 map 任务都结束后,请求 reduce 任务,获得任务后开始执行,响应完成情况后继续请求 reduce 任务直到再次获取 reduce 任务或者所有 reduce 任务结束;
  4. 请求结束连接,关机。

一些细节

  1. 生成的 nReducer 个中间临时文件可以编码成 json 的格式,方便处理,可以在打开的文件中写入:

    1
    2
    3
    enc := json.NewEncoder(file)
    for _, kv := ... {
    err := enc.Encode(&kv)

    在打开的文件中读取:

    1
    2
    3
    4
    5
    6
    7
    8
    dec := json.NewDecoder(file)
    for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
    break
    }
    kva = append(kva, kv)
    }
  2. worker每次在结束任务后都要重复请求任务(心跳),直到获得任务全部结束的信号。

  3. 通过维护待分配待完成 两个队列,可以控制任务的发放与 mapreduce 两个大任务的完成情况。

  4. 可以通过生成临时文件保存临时结果的方式避免因为超时导致的 crash

  5. 保存的文件可以带上唯一的 jobId 防止 crash

  6. coordinator 对任务的超时控制:采用了一个 select 的结构,第一个分支是超时上下文 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 来控制,如果超时,除了处理对应的两个队列外,还需要将对应工号的 rejectResult 置为 true,防止 worker 在规定时间内完成了任务,但是因为 PRC 通信的延迟导致超时出现的 coordinatorworker 对临时文件处理不一致的情况;第二个分支是对应工号的 workerDonecoordinator 在规定时间内接收到正确的完成信号后,就会向该无缓冲通道内发送数据,从而选择这一分支,表示任务顺利完成。

  7. 对锁和原子数据的使用上要注意,避免 data race(编译时通过 -race 进行竞态检测)。

项目介绍

项目地址

GitHub-MIT6.824

关键项目文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
|- doc
|- src
|- main
|- mrsequential.go
|- mrworker.go
|- mrcoordinator.go
|- test-mr.sh
|- mr
|- coordinator.go // coordinator 控制逻辑
|- rpc.go // rpc 通信结构
|- worker.go // worker 控制逻辑
|- mrapps
|- wc.go
|- indexer.go
  • Post title:MIT6.824 Lab1
  • Post author:洪笳淏
  • Create time:2022-04-06 12:36:00
  • Post link:https://jiahaohong1997.github.io/2022/04/06/MIT6.824 Lab1/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.
 Comments