目录
🌹资源链接
🍔调试技巧
🦅2 Programming Model
🌼2.1 Example
🌼2.2 Types
🌼2.3 More Examples
🦅3 Implementation(实现)
🌼3.1 ~ 3.3
🌼3.4 ~ 3.6
🦅4 Refinements(改进)
🌼4.1 ~ 4.5
🌼4.6 ~ 4.9
🦅5 Performance
🦅6 Experience
🌹资源链接
介绍:nil.csail.mit.edu/6.824/2021/notes/l01.txt
论文:rfeet.qrk (mit.edu)
视频:Lecture 1: Introduction (youtube.com)
中文视频:Lecture 1- Introduction_哔哩哔哩_bilibili
付费视频:simviso-开源分享,传播知识 (simtoco.com)
Go 入门:Go 语言之旅 (go-zh.org) -- 花了 7 个小时,3 天时间
作业提交:6.824 Lab 1: MapReduce (mit.edu)
先看论文,再看视频,最后做实验
🍔调试技巧
1) 用 DPrintf 代替 log.Printf
2) defer 延迟输出(FIRST IN LAST OUT)
3) go run your_program.go > output.log
4) grep "Error" output.log5) 竞态检测
go build -race my_program.go // 生成 .exe
./my_program // 运行
↓↓↓或直接运行↓↓↓
go run -race my_program.go6) 主循环中无限循环 或 提前退出
7) 死锁(多个routine获取同一个锁) 或 锁的粒度
8) 通道 I/O(多个routine向同一个通道发送信息,且缺少同步机制)
🦅2 Programming Model
🌼2.1 Example
0)伪代码
1)map
// MapReduce 库中的 Map 函数
void map(const string& key, const string& value) {// key: document name// value: document content// ("document1", "hello world hello")istringstream iss(value); // 字符串流, 从文本提取单词string word;while (iss >> word)// 输出键值对cout << "EmitIntermediate(" << word << ", \"1\")" << endl;
}
EmitIntermediate(hello, "1")
EmitIntermediate(world, "1")
EmitIntermediate(hello, "1")
2)reduce
// MapReduce 库中的 Reduce 函数
void reduce(const string& key, const vector<string>& values) {// key: a word// value: a list of counts// ("hello", {"1", "1"})int result = 0; // 单词出现次数for (size_t i = 0; i < values.size(); ++i)// atoi("..."): "+211" --> 211// 字符串转 int// c_str 转 C风格字符串result += atoi(values[i].c_str());// 模拟发出最终结果cout << "Emit(" << key << ", " << result << ")" << endl;
}
Emit(hello, 2)
🌼2.2 Types
输入的键 / 值(原始数据集) && 输出的键 / 值 ———— 不同域
中间的键 / 值(Map处理后生成) && 输出的键 / 值(Reduce处理后) ———— 相同域
"document1", "hello world hello" // 输入的键/值"hello", "1" // 中间的键/值
"world", "1"
"hello", "1""hello", "2" // 输出的键/值
Map函数:处理输入数据,生成中间键值对。中间键值对被 MapReduce框架收集并排序,传递给 Reduce 函数
Reduce函数:接受中间键值对,以及与之关联的的中间值列表,然后对这些值进行聚合操作,生成输出键值对
上述结构,允许 MapReduce 框架处理大规模数据集,通过分布式并行处理数据,提高数据处理效率和可扩展性
总而言之,字符串传递给用户所定义的函数,用户所定义的函数负责将字符串转化成合适的类型
🌼2.3 More Examples
1)用户程序 User Program:启动 MapReduce 的程序,定义了 Map 和 Reduce 函数。
2)主节点 Master:将输入数据拆分成多个片段(splits),并将这些 splits 分配给不同的工作节点(workers)。
3)工作节点 workers:实际数据处理的节点。分两种,执行 Map 函数 || 执行 Reduce 函数。每个工作节点从主节点接受任务,并在本地执行。
4)输入文件 Input files:存储在分布式系统 GFS。
5)Map 阶段 Map phase:workers 执行 Map 函数,根据输入数据生成中间键值对(Intermediate key/value pairs)。中间键值对会被写入本地磁盘,并按键(key)分区。
6)本地写入 Local write:Map 工作节点将中间数据写入本地磁盘。
7)远程读取 Remote read:Reduce 工作节点通过网络从 Map 工作节点的本地磁盘中读取中间数据。
8)Reduce 阶段:Reduce 工作节点执行 Reduce 函数,处理相同键的所有值,并将结果写入输出文件。
🦅3 Implementation(实现)
🌼3.1 ~ 3.3
3.1 Execution Overview
1,对中间数据排序,是为了让所有具有相同键的值都被聚合到一起。
2,workers 和 Master 都是用户程序启动的多个副本。
3,Map函数产生的中间键/值对在内存中缓冲,这些 缓冲对 在本地磁盘上的位置被传回给主节点,主节点负责将这些位置转发给Reduce工作者
4,当所有的Map任务和Reduce任务都完成后,主节点唤醒用户程序
3.2 Master Data Structures
1,主节点维护着多个数据结构。对于每个Map任务和Reduce任务,它存储着状态(空闲、进行中或已完成)
2,主节点是中间文件区域位置从 Map 任务传播到 Reduce 任务的渠道
3,因此,对于每个完成的 Map 任务,主节点存储着该 Map 任务产生的 R 个中间文件的位置和大小。
随着 Map 任务的完成,这些位置和大小信息会更新。这些信息会逐渐推送给正在进行Reduce任务的工作者。
3.3 Fault Tolerance
工作者故障:
(1)主节点定期对每个工作者进行 ping 操作。如果在一定时间内没有收到工作者的响应,主节点将该工作者标记为失败。
(2)由该工作者完成的任何 Map 任务都会重置回它们的初始空闲状态,因此有资格在其他工作者上调度。
(3)类似地,任何在失败的工作者上进行中的Map任务或Reduce任务也会被重置为空闲,并有资格重新调度。
(4)由于完成的Map任务的输出存储在失败机器的本地磁盘上,因此无法访问,所以失败时需要重新执行。由于完成的Reduce任务的输出存储在全局文件系统中,因此不需要重新执行。
(5)主节点需要定期写入 checkpoints,以便它 dead 后,可以恢复到最后一个 checkpoint 的状态
Semantics in the Presence of Failures
故障下的语义👇
1,确定性函数和输出一致性
MapReduce 中 Map函数 和 Reduce函数是确定的话,意味着,相同的输入,总是产生相同的输出
那么,无论是否发生故障,最终的输出都一样
2,原子提交和临时文件
(1)“原子”:操作要么完全完成,要么不发生,不会处于中间状态。
(2)MapRedeuce中,以原子提交的方式来保存任务的输出👇
任务的输出数据,首先被写入一个临时文件,然后一次性将该临时文件重命名为最终输出文件
这个重命名操作是原子的(确保的输出文件要么完全不可见,要么完全可见,不会出现输出文件部分更新的情况)
(3) MapReduce 中,每个任务的输出受限被写入一个临时文件(类似草稿纸),在任务完成得最终输出之前,现在 “草稿纸” 打草稿,可以防止数据丢失。
🌼3.4 ~ 3.6
3.4 本地性(Locality):
1)MapReduce 利用 GFS(Goole File System)系统存储数据。
2)GFS系统将大文件切分成一小块一小块,每块约 64 MB 大小,并且会在不同机器上保存这些数据块的副本。
3)任务开始时,主节点优先将任务分配给 已经存储了数据块 的机器,如果没有已经存储的数据块的机器,主节点就会将任务优先分配给, 离数据块存储位置比较近 的机器。
4)👆得益于上述策略,数据一般在本地机器读取,所以不需要通过网络来 传输数据 ,节省了网络带宽。
5)本地性,即,尽可能利用本地资源来处理数据,以减少对网络带宽的需求,提高数据处理效率。
3.5 任务粒度(Task Granularity)
MapReduce 中任务粒度,由两个参数决定
任务粒度过大 -- 某些机器很快完成任务变空闲,而其他机器还在忙碌,造成资源浪费
任务粒度过小 --
a. 任务分配更均匀,提高整体效率
b. 某台机器失败后,任务粒度较小,处理的任务量较小,所以可以迅速在其他机器重启
c. 可能导致任务调度开销增加
3.6 备份任务(Backup Tasks):
1)分布式计算环境中,由于 机器故障,资源竞争等原因,某些任务执行非常慢,这些执行缓慢的任务称为 “落后者”。
2)所以 MapReduce 引入了备份任务机制。
3)功能:
a. 识别落后者:执行时间超出平均时间,确定为 “落后者”
b. 调度备份任务:一旦识别出 “落后者”,主节点为 “落后者” 进行备份(除了原本正在执行的任务,相同的任务会在另一台机器执行)
c. 资源利用:开销不大,1000台机器,只需额外的几台机器执行备份
d. 任务完成:原始 OR 备份任务,一个完成 == 完成
e. 👆以上机制,显著减少了 “落后者” 导致的整个任务的拖延,使得大型 MapReduce 操作完成时间减少到 50%
f. 容错性:备份任务的存在,避免了单点故障导致的完全失败
🦅4 Refinements(改进)
🌼4.1 ~ 4.5
4.1 Partitioning Function(分区函数)
1)分区函数:
用来决定如何将 Map 阶段产生的 中间键值对(Intermediate key/value pairs) 分配给不同的 Reduce 任务。
这个过程称为 分区(Partitioning)(将大量数据切分成更小的块,以便并行处理)
2)作用:
a. 负载均衡:使得 Reduce 任务获得大致相同数量的数据,从而避免某些任务过载而其他任务空闲的情况。
b. 数据局部性:合理的分区策略,使得数据在 物理上 更接近于处理它的任务,减少数据在网络中的传输。
3)默认分区函数:
默认分区函数基于哈希。
比如,使用 hash(key) mod R 策略,hash(key) 是键的哈希值,R 是Reduce任务的数量
4.3 Combiner Function
reduce函数的输出被写入最终的输出文件
combiner函数的输出被写入一个中间文件,该文件将被发送到reduce任务
4.4 Input and Output Types
- 支持多种格式
MapReduce库支持多种输入数据格式,例如“文本”模式,其中每行作为一个键/值对,键是文件中的偏移量,值是行的内容- 输入分割
每种输入类型知道如何将自己分割成有意义的范围,以便作为单独的Map任务处理,例如文本模式确保仅在行边界分割
4.5 Side-effects
1)确定性:任务的输出只和它的输入有关,和执行顺序和外部环境无关
2)原子性:先将所有数据写入临时文件,成功后,原子性的将临时文件命名为最终输出文件,保证任务过程不会留下损坏的文件
3)幂等性:任务重新执行(多次执行),输出文件一致
🌼4.6 ~ 4.9
4.6 Skipping Bad Records
1)MapReduce库检测哪些记录导致确定性崩溃,并跳过这些记录以取得进展
2)每个工作进程安装了一个信号处理程序,用于捕获段错误和总线错误
3)Map或Reduce操作之前,MapReduce库将参数的序列号存储在全局变量
4)用户代码生成信号,信号处理程序,发送一个包含序列号的“临终”UDP数据包,到MapReduce 主节点5)主节点记录多次失败后,下次 Map 或 Reduce 任务就会跳过该节点
4.7 Local Execution
1)Map 或 Reduce 函数很难调试,因为分布式通常有几千台机器
2)所以我们通过一个特殊标志,可以直接使用 gdb 等调试/测试工具
4.8 Status Information
1)主节点运行一个HTTP服务器,导出一组页面让用户使用
2)页面显示:已完成任务,进行中任务,输入字节数,中间数据字节数,输出字节数,处理速率等信息(以便增加资源 或 优化代码)
3)还显示了:失败工作节点,失败时执行的任务(以便调试代码)
4.9 Counters
1)创建计数器对象,用于统计各种事件的发生次数
2)各个工作节点上的计数器值会定期发送回主节点,并由主节点聚合这些值,避免重复计数,并在作业完成后返回给用户代码
// 声明一个计数器对象指向uppercase
Counter* uppercase;
// 通过GetCounter函数获取名为"uppercase"的计数器
uppercase = GetCounter("uppercase");
// 定义Map函数处理输入
map(String name, String contents): // 遍历输入内容中的每个单词for each word w in contents: // 如果单词是大写开头的if (IsCapitalized(w)): // 对应的计数器increment操作,增加计数uppercase->Increment(); // 调用EmitIntermediate函数,发出中间键值对EmitIntermediate(w, "1");
🦅5 Performance
(1)
排序 sort 任务需要将中间输出写入本地磁盘是因为Map任务生成的数据量很大,需要在本地磁盘上进行聚合和排序,以便于后续的Reduce任务处理
(2)
而 grep 任务由于其输出数据量小,可以直接在内存中处理或通过网络传输给Reduce任务,无需额外的磁盘I/O操作
(3)
备份是为了解决“落后者”(主节点识别“落后者”,并为它备份--相同任务在另一台机器执行,避免了单点故障导致的完全失败)
(4)
备份后,即使在有意引入机器故障的情况下,MapReduce程序也能够有效地恢复并完成执行,Figure 3(c)就是,即使 kill 了 200 个进程,重新执行这些 Map 任务也很快,只比正常执行时间多了 5%
🦅6 Experience
(1)应用领域
- 大规模机器学习问题
- Google新闻和Froogle产品的数据聚类问题
- 提取用于生成热门查询报告(例如Google Zeitgeist)的数据
- 提取新实验和产品使用的网页属性(例如从大量网页语料库中提取地理位置以进行本地化搜索)
- 大规模图计算
(2)成果
MapReduce之所以如此成功,是因为它允许开发人员在半小时内在一千台机器上简单编写并高效运行程序,大大加快了开发和原型设计周期
(3)大规模索引(Large-Scale Indexing)
a. 因为处理容错、分布和并行化的代码隐藏在 MapReduce 库中,当使用MapReduce表达时,从 3800 行的 C++ 代码减少到 700 行
b. 大多数由机器故障、慢机器和网络问题引起的问题都由MapReduce库自动处理,无需操作员干预,极大提高了索引过程的性能