UESTC大数据分析与智能计算期末复习2
本文最后更新于:2 年前
这里是期末对于大数据计算课程的复习嗷!!!
Hadoop生态系统
- 系统架构:
- 部署在低成本的Intel/Linux硬件平台上
- 由多台装有Intel x86处理器的服务器或PC机组成
- 通过高速局域网构成一个计算集群
- 各个节点上运行Linux操作系统
- 三大主要模式:
- 单机模式(standalone mode)
- 虚拟分布模式(pseudo-distributed mode)
- 完全分布模式(completely distributed Mode) -> 这个实际生产过程中用的最多嗷!!!
- 集群配置:
- 硬件配置:
- NameNode(执行作业调度、资源调配、系统监控等任务)
- DataNode(承担具体的数据计算任务)
- 软件配置:
- Linux O/S
- JDK 1.6以上版本
- SSH(Security Shell)安全协议
- 网络配置:
- NameNode 到机架 (Rack) 的网络连接
- 机架内部的DataNode之间的网络连接
- 硬件配置:
- 集群软件配置:
- 主节点运行的程序或进程:
- 主节点程序Namenode
- Jobtracker 守护进程
- 管理集群所用的Hadoop 工具程序和集群监控浏览器
- 从节点运行的程序:
- 从节点程序Datanode
- 任务管理进程Tasktracker
- 区别:
- 主节点程序提供 Hadoop 集群管理、协调和资源调度功能。
- 从节点程序主要实现 Hadoop 文件系统(HDFS)存储功能和节点数据处理功能。
- 主节点运行的程序或进程:
- Hadoop生态系统:
- 基于HDFS / HBase的数据存储系统
- 基于YARN / Zookeeper的管理调度系统
- 支持不同计算模式的处理引擎
- 生态体系:
- 总体结构:
- HDFS -> 分布式文件系统,高容错,高吞吐,常用
- MapReduce -> 分布式批处理计算模型(Mapper,Reducer)
- Yarn -> 海量数据运算时的资源调度
- Spark -> 大规模数据快速处理通用的计算引擎,提供大量的 库Spark Core、Spark SQL、Spark Streaming、MLlib、
GraphX等 - Flume:高可用,高可靠,分布式的海量日志采集,聚合,传输的系统,用于做数据采集
- Kafka:分布式消息发布/订阅系统,与Spark Streaming一起完成实时业务计算
- Hive/Pig:Hive是数据仓库工具,结构化数据映射为数据表。Pig是Hadoop客户端软件,hadoop集群连接进行数据分析工作。
- Hbase:面向列的分布式数据库。适合存储非结构化数据。
- Redis:可基于内存也可以持久化的日志型、Key-Value数据库。往往用来缓存key-value类型的小表数据。
- Oozie:一个可扩展的workflow系统,用于协调多个MapReduce作业 的执行
- 数据存储系统:
- 组成:
- HDFS -> 分布式文件系统
- Hbase -> 分布式非关系型数据库
- Hive和Pig -> 数据仓库以及数据分析工具
- Sqoop和Flume -> 数据采集,转移和汇总的工具
- HDFS文件系统构成了Hadoop存储体系的基础。
- 组成:
- 管理调度系统:
- Zookeeper:分布式协调管理服务
- Oozie:作业调度
- Ambari:集群配置,管理和监控
- Chukwa:大型集群监控系统
- YARN:集群资源调度管理系统
- HDFS分布式文件系统:
- 结构:
- 物理存储资源和对象分散在网络相连的远程节点上
- 主控服务器(也称元数据服务器):负责管理命名空 间和文件目录
- 远程数据服务器(也称存储服务器)节点:存储实际 文件数据
- 特点:
- 透明性
- 高可用性
- 支持并发访问
- 可扩展性
- 安全性
- 结构:
- HDFS体系结构:
- 唯一主节点:运行NameNode,JobTracker,Zookeeper, Hmaster等负责集群管理、资源配置、作业调度的程序
- 多个从节点(dataNode):承担数据存储及计算任务。
- 客户端(Client):用于支持客户操作HDFS
- HDFS架构:
- Master/Slave架构,设置一个主节点
- 优点:
- 简化了系统设计
- 元数据管理和调配容易
- 缺点:
- 命名空间的限制
- 性能瓶颈
- 单点失效(SPOF)的问题
- HDFS存储结构:
- 以块(block)为基本单位存储文件
- 每个文件被划分成64MB大小的多个blocks,属于同一个 文件的blocks分散存储在不同DataNode上;
- 出于系统容错需要,每一个block有多个副本(replica), 存储在不同的DataNode上;(三个,其中两个在同一个机架上,剩余的一个在其他的机架上)
- 每个DataNode上的数据存储在本地的Linux文件系统中。
HDFS存储结构优势:
- 有利于大规模文件存储
- 适合数据备份
- 系统设计简化
HDFS命名空间管理:
- 命名空间包括目录,文件和块
- 文件 -> block -> 节点的映射关系作为元数据存储在NameNode上
- 整个HDFS集群只有一个命名空间,由Master管理
- HDFS使用传统的分级文件管理体系。
- NameNode进程使用FsImage和EditLog对于命名空间进行管理。
FsImage:
- 存储和管理内容:文件系统目录树
- 目录树钟所有文件和文件夹的元数据
- NameName进程 把 文件 -> block -> 节点映射关系表装载并保留在内存中。
EditLog:NameNode启动后对于文件系统改动操作的记录。
第二NameNode:对于HDFS数据信息的备份,减少NameNode重启的时间,一般独立部署在一台机器上。工作流程:Roll edits -> Retrieve FsImage and edits from NameNode -> Merge -> Transfer checkpoint to NameNode -> Roll again
HDFS文件读写机制:
- 主要访问机制:HDFS shell命令,HDFS Java API
HDFS读文件流程:
- 打开文件 -> 获取块信息 -> 读取请求 -> 读取请求 -> 读取下一个数据块 -> 关闭文件
HDFS写文件流程:
- 创建文件 -> 建立文件元数据(映射关系) -> 写入请求 -> 写入数据包 -> 接收确认包 -> 关闭文件 -> 结束过程 -> 通知NameNode节点关闭文件
- HDFS数据容错和回复:
- 多副本方式进行冗余存储:加快数据传输,检查数据错误容易,保证数据可用性。
- 机架感知副本存放策略:防止数据的可靠性,可用性和宽带的利用率。防止某一机架失效时数据丢失。利用机架内的高宽带特性提高读取速度。
- 错误检测和恢复机制:包括NameNode检测,DataNode检测和数据错误检测。
- 副本存放:
- block1放到与客户端同一机架的一个节点;block2放到block1所在 机架之外的节点;block3放在与block2同 一机架的另一节点。
- 副本策略:
- 读取流程:HDFS提供了一个API可以确定某一数据节点所属的机架ID -> 客户端从NameNode获得不同副本的存放位置列表 -> 调用API确定这些数据节点所属的机架ID -> 发现ID匹配:优先读取该数据节点存放的副本 -> 没有发现:随机选择一个副本读取数据
- 错误检测和恢复机制:
- NameNode检测:第二名称节点
- DataNode检测:心跳检测
- 数据错误检测:CRC循环校验
- 心跳检测机制:
DataNode周期性的向集群NameNode发送心跳包和块报告
分布式存储架构(HBase):
- HBase集群部署
物理部署:Hadoop集群
软件部署:四大组件:Master , Region Server , Zookeeper , Client
Hbase相关基本概念:
Region:
- 按照RowKey划分成的子表
- 数据表在集群中存储的最小单位
- 可以被分配到Region Server进行存储管理
- 各个Region Server存放的Region数目大致相同,负载均衡。
- Region内部包含一个HLog日志和多个Store,数据实际上是存储在Store单元中。
Store:
- Region内部按照列簇分为不同的Store
- 每个Store由一个memStore和多个StoreFile组成
- memStore是内存中的一个缓存区
- StoreFile是写到硬盘上的数据文件
- 数据首先会放入MemStore中,当MemStore满了以后会清空形成 一个新StoreFile
- 检索数据时,先在memStore找,然后找StoreFile
- compact操作:
- 当StoreFile文件数量增长到一定阈值时触发
- 将多个StoreFile合并成一个StoreFile
- 在合并过程中会进行StoreFile版本合并和数据删除。
- split操作:
- 当单个StoreFile大小超过一定阈值后触发
- 把当前的Region分裂成2个子Regions
- 子Region会被Master分配到相应的Region Server上
- 是HBase提供的负载均衡机制
HFile:
- StoreFile包含的一个HFile文件
- 是Hadoop的二进制格式文件
- StoreFile是HFile的轻量级包装,数据最终是以HFile的形式存储在Hadoop平台上
- 采用一个简单的byte数组存储数据的每个KeyValue对
- 这个byte数组里面包含了很多项,有固定的格式,每项有具体的含义。
Hbase数据模型与存储模式:
- HBase表特性:
- 面向列的、稀疏的、分布式的、持久化存储的多维排序映射表
- Hbase表索引:
- 行关键字、列簇名、列关键字及时间戳
- Hbase表值形式:
- 一个未经解析的byte数组
- HBase表特性:
Hbase数据模型:
- 以表的形式存储数据
- 表由行和列族组成
- 一个表可包含若干个列族
- 一个列族内可用列限定符来标志不同的列
- 存于表中单元的数据尚需打上时间戳
Hbase数据模型基本元素:
- 表,行键,列族,单元格,时间戳
Hbase数据模型与存储模式:
- Hbase存储逻辑视图:
- 一个三元组(行键,列族:列限制符,时间戳) 可以唯一地确定存储在 单元(Cell)中的数据
- Key是一个三元组(行 键,列族:列限制符, 时间戳)
- Value就是这个三元组定
位的数据值
- 存储物理视图:
一个列族对应生成一个Region:
- Hbase物理存储:
- 表划分出的列族对应着物理存储区的Region
- 列族所包含的列对应着的存储区Region所包含的Store
- 当增大到一个阀值的时候,Region就会等分成两个新的 Region
- Hbase存储逻辑视图:
Hbase寻址机制:
- 三层机构:Zookeeper文件 -> -ROOT-表 -> .META.表 -> 找到存放用户数据 的Region Server位置。
- 客户端从Zookeeper获得Region的存储位置信息后,直接在 Region Server上读写数据。
Hbase扫描读取数据:
- 所有的存储文件被划分成若干个存储块
- 存储块在get或scan操作时会加载到内存中
- HBase顺序地读取一个数据块到内存缓存中
- 再读取相邻数据时从内存中读取而不需要读磁盘。
Hbase写数据:
- Client向Region Server提交写数据请求;
- Region Server找到目标Region;Region检查数据是否schema一致;
- 如果客户端没有指定版本,则获取当前系统时间作为数据版本;
- 将数据更新写入HLog(WAL),只有HLog写入完成之后,commit()才返回 给客户端;
- 将数据更新写入MemStore;
- 判断MemStore的是否需要flush为StoreFile,若是,则flush生成一个新 StoreFile;
- StoreFile数目增长到一定阈值,触发compact合并操作,多个StoreFile合并 成一个StoreFile,同时进行版本合并和数据删除;
- 若单个StoreFile大小超过一定阈值,触发split操作,把当前Region拆分成2 个子Region,原来的Region会下线,新分出的2个子Region会被Master重新分配到相应的Region Server上
更新表:
- 首先写入HLog和MemStore
- MemStore中的数据是排序的
- 当MemStore累计到一定阈值时:
- 创建一个新的MemStore
- 将老的MemStore添加到flush队列,由单独的线程刷写到磁盘上,成为一个新StoreFile
- 系统在HLog中记录一个检查点,表示这个时刻前的变更已持久化
防止数据丢失:
- 每个Region服务器都有一个自己的HLog文件
- 每次启动都检查HLog文件,确认最近一次执行缓存刷新操作之后是否发生新的写入操作
- 发现更新时:
- 写入MemStore -> 刷写到StoreFile -> 删除旧的Hlog文件,开始为用户提供服务
StoreFile合并与分裂:
- 合并 —— 时机:当一个Store中的StoreFile达到一定的阈值时。操作:将同一个key的修改合并到一起,形成一个大的 StoreFile
- 分裂 —— 时机:当StoreFile的大小达到一定阈值后。操作:等分为两个StoreFile。
HBase索引与检索:
- 分块检索
- 三种查询方式:
- 基于单个RowKey的查询:只利于已知行键(RowKey)抽取一条数据项(data record)的查询
- 通过一个RowKey的区间来访问:一次性读取一个子表(数 据块)
- 全表扫描:搜索不知RowKey的数据项、或者读取数据的某 类属性(统计分析中常常用到)
- 结论:基于行键(RowKey)的搜索方式不利于读取数据的某类 属性值(列),延迟高、速度慢、效率低,浪费计算资源
Hbase二次索引表机制:
关键原理:建立主表列到RowKey的逆向映射关系:
- 成本:索引表占用额外空间,多一级搜索
- 收益:避免了全表搜索,大大提高搜索效率
实现技术:
- 表索引 -> 主表的索引列值为索引表的RowKey, 主表的RowKey做为索引表的Qualifier或Value。
- 列索引 -> 增加一个单独列族存储索引值,主表的用户数据列值做为索引列族的Qualifier用户数据Qualifier做为索引列族的列值
Hadoop资源管理与作业调度实现方案 -> 三大组件:
- Zookeeper提供分布式协同服务
- Oozie 提供作业调度和工作流执行
- YARN 提供集群资源管理服务
分布式协同管理组件Zookeeper:
提供服务:统一命名服务,应用配置管理,分布式锁服务,分布式消息队列。
架构:主从架构
Zookeeper服务由一组Server节点组成 -> 每个节点上运行一个Zookeeper程序。
每个server维护内容:自身的内存状态镜像、持久化存储的事务日志和快照。
ZooKeeper集群的数量一般为奇数(能够方便选举出下一个ZooKeeper)
有过半Server可用,整个系统即保持可用性。
Zookeeper的其他相关内容:
节点角色:
- Leader
- Follower
- Observer
失效处理机制:Zookeeper作出快速响应 -> 消息层基于Fast Paxos算法重新推举一个Leader,继续作为协调服务中心处理客户端的写数据请求,并将ZooKeeper协同数据的变更同步(广播方式)到其他的Follower节点。
业务流程:
- 客户端Client连接到Follower发出写数据请求
- 请求发送到Leader节点
- Leader完成元数据更新
- Leader上的数据同步更新到其他Follower节点
统一命名格式:
- 把各种服务名称、地址、及目录信息存放在分层结构中供需要时读取 -> 提供一个分布式序列号生成器
配置管理服务:发布(publish)和订阅(watch)模式
分布锁的实现:独占锁和控制时序锁
分布式消息队列:同步队列和FIFO队列
Oozie:
- 核心功能:
- 工作流:定义作业任务的拓扑和执行逻辑
- 协调器:负责工作流的关联和触发分布式消息队列
- 工作流包括:
- 控制流节点:定义工作流的开始和结束,控制执行路径
- 动作节点:支持不同任务类型
- 核心功能:
Yarn:
- 优势:允许多个应用程序运行在一个集群上,并将资源按需分配给它们,这大大提高了集群资源利用率。YARN允许各类短作业和长服务混合部署在一个集群中, 并提供了容错、资源隔离及负载均衡等方面的支持,这大大简化了作业和服务的部署和管理成本,强化了对应用程序的支持。
- 体系架构 —— Master/Slave架构:
- Master为YARN的Resource Manager
- Slave为NodeManager
- Application Master
- Container
- YARN Client
- 部署方式:
- Resource Manager:部署并运行在NameNode上
- Node Manager:部署在每个DataNode上,作为Resource Manager的节点代理。
- 每个DataNode都包含一个或多个多个Container用于资源调度
- 每一个提交给Hadoop集群的Application都有一个Application Master与之对应,运行在某个DataNode上。
Hadoop / HDFS / Hbase存储系统总结:
- 大数据存储架构: 分布式文件系统HDFS,HBase分布式数据 库,行存储vs.列存储
- HDFS底层存储结构: Namenode/Datanode,分片 (partition),数据块(block), 冗余备份(replica),
机架感知备份存放 - HBase分布式存储结构:
- 逻辑存储结构:key-vlaue键值对、三元组(行键、列族: 列限制符、时间戳)、Hbase数据表
- 物理存储结构:Region, Store, HFile
- HBase索引与检索:二次索引表设计,技术解决方案
MapReduce计算模型
- 分布式并行计算系统:
- S -> Single , I -> Instruction , M -> Multiply , D -> Data
- SISD
- SIMD
- MISD
- MIMD
多指令流多数据流(MIMD)模型:
- 按照处理器是否共享内存划分:
- 多处理器共享内存机器:
- UMA架构
- NUMA架构
- 多计算机独立内存体系:
- MPP架构
- 集群架构
- 归属于MIMD体系的计算机架构:
- 并行向量处理机、对称多处理机、大规模并行处理机、 工作站机群、分布式共享存储处理机
- 多处理器共享内存机器:
- MIMD模型:
- 按照处理器是否共享内存划分:
Cluster 计算架构:
- 集群由多个独立的计算机(服务器或工作站,称为集群节点)通过高速局域网连接在一起,每个节点拥有独立的内存和磁盘,一个节点的CPU不能直接访问另外一个节点的内存空间;
- 每个节点拥有独立的O/S和文件系统;
- 节点间采用消息传递(message passing)方式进行数据交换,使用如 MPI,PVM等中间件;
- 在节点内部(本地机器上)支持共享内存的并行计算模式,可使用 OpenMP、pthreads等编程模型;
- 需要一系列集群平台软件来支持整个系统的管理与运行,包括集群系统 管理软件(如IBM的CSM、xCat等),消息中间件(如MPI,PVM等),作 业管理与调度系统(如LSF、PBS,LoadLeveler);并行文件系统(如PVFS、GPFS等;
- 系统吞吐量大、可靠性高、可扩展性好、计算性价比好(cost-effective);
Cluster计算架构:
- MapReduce是面向大规模数据并行处理的:
- 基于集群的高性能并行计算平台(Cluster Infrastructure) -> 允许用市场上现成的普通PC或性能较高的刀架或机架式服务器,构 成一个包含数千个节点的分布式并行计算集群。
- 并行程序开发与运行框架(Software Framework) -> 提供了一个庞大但设计精良的并行计算软件构架,能自动完成计算 任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动 分配和执行子任务以及收集计算结果,将数据分布存储、数据通信、容 错处理等并行计算中的很多复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
- 并行程序设计模型与方法(Programming Model & Methodology) -> 借助于函数式语言中的设计思想,提供了一种简便的并行程序设计 方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了完整的并行编程接口,完成大规模数据处理。
- Google MapReduce:
- Google在OSDI国际会议上发表了一篇论文。
- Google公司用MapReduce重新改写了整个搜索引擎中的Web文档索引处理。
- 自MapReduce发明后,Google大量用于各种海量数据处理,目前 Google内部有7千以上的程序基于MapReduce实现。
- Google目前在全球的数十个数据中心使用了百万台以上的服务器构成 其强大的Web搜索和海量数据并行计算平台。
- Google可提供超过80亿网页和10亿张图片的检索索引,每天处理2亿 次以上检索请求,平均每个检索耗时0.5秒;每个搜索请求背后有上千个服务器同时进行检索计算和服务。
- Google MapReduce:
- 模仿Google MapReduce,基于 Java设计出了称为Hadoop的开源MapReduce,该项目成为Apache下最重要项目。
- MR的三个基本思想:
- 如何对付大数据:分而治之 -> 对相互间不具有计算依赖关系的大数据,实行并行最自然的方式就是分而 治之(divide-and-conquer)。
- 上升到抽象模型:Mapper和Reducer -> 用Map和Reduce两个函数提供了高层的并行 编程抽象模型。
- 上升到架构:统一构架,为程序员隐藏系统细节 -> MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
- MR分治法:
- 将大数据集划分为小数据集, 小数据集划分为更小数据集
- 将最终划分的小数据分布到集群节点上
- 以并行方式完成计算处理
- 将计算结果递归融汇,得到最后结果
- 基于M/R的并行计算模型:
MapReduce计算架构 —— JobTracker模式
- 四大组件:
- Client , JobTracker , TaskTracker , Task
- 四大组件:
MR软件架构:
- Job:完整的计算过程,一个作业可以被拆分为若干个Map和Reduce任务完成。
- Task:MR进行并行计算的基本事务单元,分为Map和Reduce任务,一个作业包含多个任务。
MR组件:
- MR程序:我们编写的程序
- JobClient:替程序与 MapReduce运行框架交互的对象
- 组件(续):
- JobTracker:
- MR的管理者
- 协调MR作业
- 分配任务
- 监控任务
- TaskTracker:
- 执行JobTracker分配的任务
- 分为Map和Reduce两类
- JobTracker:
MapReduce计算架构——YARN模式:
- Client , ResourceManager , NodeManager , Container , Application Master
MR主要任务-映射和简化
- Map(映射):负责输入数据的分片,转化,处理,输出中间结果文件。
- Reduce(简化):以Map的输出作为输入,对于中间结果进行合并处理,结果写入HDFS
- 多个进程运行在DataNode上,相互间通过Shuffle阶段交互数据。
数据模式:键值对
- MapReduce以键值对(key-value pair)来完成数据计算处理。
- 键是行键(RowKey),多半用作索引(indexing)
- 值是字符串(character string)或者二进制数组(binary string)
数据输入模式:
- 数据分片——定义
- 大数据文件进行分片,生成一个个InputSplit(简称为split)
- 一个InputSplit对应一个计算任务(task),分配到计算节点,由 map/reduce进程执行计算处理。
- split是我们对数据文件出于计算需要的逻辑划分单位,但一个HDFS文件在集群中实际是以块(block)的物理形式存储的。
Split与Block:
最好split是block的整数倍嗷!!!
- Map数目设置:
- block_size:HDFS文件的block size
- total_size : 输入文件整体的大小
- input_file_num : 输入文件个数
- map数目设置:
- 默认map数目:默认的map数由block_size决定:default_num = total_size / block_size;
- 预设map数目:可通过参数mapred.map.tasks来设置期望的map数目,但是这个数只有 在大于default_num的时候才会生效:goal_num = mapred.map.task
- 设置分片大小(split size):可以通mapred.min.split.size 设置每个task处理的split的大小,但是 这个 大小只有在大于block_size的时候才会生效。 split_size = max(mapred.min.split.size, block_size);
split_num = total_size / split_size; - 计算map数目:compute_map_num = min(split_num, max(default_num, goal_num))
- 每一个map处理的分片是不能跨越文件的,所以,最终的map个数应该为:final_map_num = max(compute_map_num, input_file_num)
- Map数目设置准则:
- 增加map个数→设置mapred.map.tasks 为一个较大的值
- 减小map个数→设置mapred.min.split.size 为一个较大的值
- 输入中有很多小文件,依然想减少map数目→需将小文件merge为大文件,然后使用第二点准则
- 输入格式处理:
- 将不同格式数据转换为键值表: 基础类InputFormat类
- 选择作为输入的文件或对象
- 提供把文件分片的InputSplits()方法
- 为RecordReader读取文件提供一个工厂方法
- 将不同格式数据转换为键值表: 基础类InputFormat类
- MR计算流程:
- MR三个阶段:
- Map(映射):Mapper执行map task,将输出结果写入中间文件
- Shuffle(归并):把Mapper的输出数据归并整理后分发给Reducer处理,包括 merge,combine,sort和partition几个步骤;
- Reduce(化简):Reducer执行reduce task,将最后结果写入HDFS。
- Map / Shuffle / Reduce的任务和实现:
- MR会慢???为什么???三个原因???
- 生成大量中间文件
- 将文件存储到磁盘中
- 中间文件的远程读取
- Map:
- Mapper的输入数据来源:MapContext
- MapContext的实现依赖于:MapContextImpl
- MapContextImpl内部组合:InputSplit和RecordReader
- 提供了读取和封装输入数据键值对(key, value)的方法。
- 对于每一个split,系统都生成一个map task,调用Mapper来执行,将读 入数据转换成键值对格式。
- 完成计算处理后,将输出结果写入中间文件
- 一个Map任务可以在集群的任何计算节点上运行
- 多个Map任务可以并行地运行在集群上
- Shuffle:
- 主要任务: 将每个map task的输出结果进行归并、排序、然后按照一定的 规则分发给Reducer去执行化简步骤
- Shuffle任务实际上涉及到Map的输出以及Reduce的输入
- Shuffle阶段的两个部分:Map相关部分和Reduce相关部分
Map端的shuffle:
- Mapper从HDFS读取split,然后执行 map ()
- 根据key或value以及Reducer数量划分输出中间表,决定交由哪个reduce task来处理
- 将中间数据写入内存缓冲区
- map task完成时,将全部溢写文件归并到一起合成一个溢写文件(Merge)
Reduce端的shuffle:
- Reduce端拉取数据:
- 算法实例:
- 输入文件:一个包含3行文字的文本文件(每个单词间用空格隔开,图 12-18最左侧Input列所示)。
- 输出结果:该文件的词频统计,每一行输出一个键值对“单词,出现 次数”(图12-18最右侧Final result列所示)。
- 第一步Split:
- 第二步Map:
先将split的每一行文字转换成如下的键值对(每行第一个字符的字节 偏移量作为Key):
然后针对每一个split执行map ( )方法,此处为对上述键值对表的每一 行进行词频统计,每一个Map任务(针对一个split)都会生成如下的
键值对:
- 第三步Shuffle:
Map端shuffle,没有定义Combiner
Map端shuffle,定义了Combiner:
Reduce端shuffle:
- 第四步Reduce:
图并行计算框架(BSP)
- 社交网络等常常作为系统计算的一部分,问题包括最短路径,集群,网页排名,连通分支等。
- 基本概念:Vertex , Edge 。无向图,有向图,简单图(任意两顶点间最多只有一条边,且不存在自环的无向图称为简单图)Degree:图G=(V, E)的顶点v的度是与v相连的边的数目(自环边计两次), 记为d(v)。
- 基本问题:子图,染色,路径,网络流,覆盖问题。
- 图计算方法:
- 定义图数据格式,包括输入数据和输出结果格式。
- 建立图计算模型与算法:对于实际问题抽象出图计算模型,图算法设计(全局的循环迭代(iteration)),数学表达。
- 图并行计算的实现:分割,任务及资源调度,迭代步骤,同步与通讯机制。
- 图数据结构,简单图模型的常用存储结构包括:邻接举证,邻接表,十字链表,邻接多重表。
- 图计算例子:上述图最 短路径问题就转化为矩阵的迭代计算问题,计算结果就是图中两点间的最短路径。
- BSP模型:
- 整体同步并行模型,是一个 逻辑概念模型
- BSP超步:
- 核心思想: 将任务分步完成,通过定义SuperStep (超步) 来完成任务的分步计算。将一个大任务分解为一定数量的超步, 而在每一个超步内各计算节点(即组件,Virtual Processor代表) 独立完成本地的计算任务,将计算结果进行本地存储和远程传递以后,在全局时钟的控制下进入下一个超步。
- 计算过程:
- 本地计算:一个superstep内,虚拟处理节点从自身存储器读取数据计算
- 全局通信:每个处理器通过发送和接收消息,与远程节点 交换数据。
- 栅栏同步:当一个处理器遇到栅栏(Barrier)时,会停下 等到其他所有处理器完成计算;每一次Barrier同步也是前
一个超步的完成和下一个超步的开始。
- 计算流程:
- 不同超步的计算单元可以设置为相同或不相同(比如在Superstep 0,Superstep 1这两步内,Peer1与Peer 2 执行不同的计算步骤A和D, 但Peer 5与Peer 6却一直执行相同的计算步骤C)
- 某些进程在特定的超步中可以不必进行障碍同步(比如在Superstep 0, Superstep 1之间,BSP Peer1与BSP Peer2在遇到 Barrier Synchronizationer 1时需进行障碍同步,但BSP Peer 5与BSP Peer 6却勿需同步。在Barrier Synchronizationer 2 时所有进程需要进行障碍同步)。
- BSP计算架构:
- 采用了Master/Slave模式,一个主节点上BSPMaster主程序,多个从节点(Slave)上运行多个GroomServer进程承担计算处理任务。
- 和Hadoop的Master/Slave非常相似,可以很方便部署
- Pregel图并行计算框架:
Master负责计算任务的分配、 调度和管理,具体负责把一个计算作业的大图分割成子图(subgraph),然后把每个子图作为一个计算任务分发给一个工作服务器(Worker)去执行(一个 Worker可能会收到多个计算任务)。多个工作服务器按照超步模式完成并行计算。
- 计算架构:
- Master:
- 图分割及用户输入数据
- 任务分配调度
- 容错机制
- Worker:
- 执行计算任务
- 节点间通信
- 持久化数据:GFS上,中间数据:Local disk上
- Master:
- Pregel图划分:
- 大图按照算法,划分为多个分区(paritition)-> 每个分区都包含了一部分的vertex以及以其为起点的边。Master则将一个或多个分区分发给每个Worker。一个顶点被分配到哪个分区由分割算法(partition algorithm)来决定的。
N为分区总数,ID是这个 顶点的标识符。另外,Pregel也容许用户自己定义partition函数。
- Pregel计算流程:
- 计算,状态更新,同步通信 基于超步superstep来组织
- 一个超步内,每个vertex会调用用户定义函数计算,计算在各个顶点并行执行。
- 所有的顶点的初始状态(superstep 0)均为“active”。一个顶点在一个超步内完成了它的计算任务,没有下一步计算要执行,就可以自己标志为“inactive”,这样它的计算函数不会再被调用,除非它又被激活;一个顶点的“inactive”状态可以为另一个顶点发送过来的消息而变为“active”(即被其他顶点的消息所激活)。
Pregel顶点计算:
- 每一个超步内,各顶点的计算都在节点本地进行,各顶点计算是 独立的,没有对其他顶点计算结果或计算逻辑上的依赖性。
- 没有资源竞争,避免了deadlock。
- 顶点间的通信被局限在步骤之间的barrier期间完成。
- 每个顶点可以在超步内送出给其它顶点的消息,但这些消息不会马 上处理。当这个超步结束时下一个超步开始前,所有的顶点统一处理它们各自收到的消息。
- 当所有的顶点都进入“inactive”状态,且没有消息传递时, Master即可决定这个作业已结束。
- Pregel的顶点间通信采用了纯消息传递(message passing)模式,不包含远程数据读取或共享内存的方式,这是因为两个原因: 一是消息传递模型足够满足各类图算法的通信需要;二是出于性能的考虑。在分布式环境中从远程机器上读取一个值伴随有很高的时间延迟。
Combiner:
- 超步计算之间,一个节点(Worker)上的多个顶点(vertex)可能同时向另一个节点(Worker)上的顶点发送消息。在某些算法中,接受顶点需要的并不 是每一个发送顶点的单独值,而可能是其中的最大值、或是求和值,这种情 况下,Pregel提供了Combiner机制来合并发出消息,使得多个顶点发给同一 目标点的多个消息合并成一条消息,从而减少消息传递开销、降低网络流量
负担。
- Combiner可以在 发送端或者接收端实现
- 超步计算之间,一个节点(Worker)上的多个顶点(vertex)可能同时向另一个节点(Worker)上的顶点发送消息。在某些算法中,接受顶点需要的并不 是每一个发送顶点的单独值,而可能是其中的最大值、或是求和值,这种情 况下,Pregel提供了Combiner机制来合并发出消息,使得多个顶点发给同一 目标点的多个消息合并成一条消息,从而减少消息传递开销、降低网络流量
Aggregator:
- Pregel提供一种Aggregator机制来实现并行计算系统的全局通信、 状态监控和数据查看。
- Pregel使用如图所示的树状结构来实现aggregator 功能,即在一个超步S中,节点(Worker)上的每一个顶点(vertex)都 可以向该节点的aggregator发送一个数据,系统会使用一种Reduce操作来聚合这些数据,产生的值在超步S结束时向更高一级的aggregator传送。
聚合产生值将会对所 有的顶点在超步S+1中可见。Pregel提供 一些预先定义的 aggregators,如可以在各种整数和string类型上执行min,max,sum等操作的aggregator。
- 开源图并行计算框架Hama:
Hama实际上是一个高性能集群上基于BSP并行模型和Hadoop平 台构建的分布式并行计算框架(distributed computing framework),
支持如下领域的大规模数据处理计算:
- 大规模矩阵运算
- 机器学习 (K-means Clustering,Decision Tree)
- 图计算(BFS, PageRank, Bipartite Matching, SSSP,最大流 最小割(MF-MC)算法等)
- 网络算法(神经网络、社交网络分析、网络实时流量监测等)
- 计算架构:
- Hama的计算架构仍然采用了Master/Slave模式,组成部分:BSPMaster , GroomServer , Zookeeper
- 在新版本的Hama计算结构中,原来的BSPMaster被BSP AppMaster替代,GroomServer则改写成了BSPRunner,相应的程序也进行了改写,以匹配YARN运行环境
- BSPMaster:
- GroomServer:
- Zookeeper:
- Hama计算流程:
一个Hama作业(Job)的流程首先分为三部分:JobClient的作业 提交、BSPMaster的初始化与作业分发、以及GroomServer的计算任务执行,如图所示:
- Hama作业流程:
- 作业的提交:
- 作业的初始化:
- 任务调度以及分派:
- GroomServer在运行期间会周期性的向BSPMaster发送 “心跳”信息。“心跳”信息中包含GroomServer的状态,一个 GroomServer可以通过“心跳”信息告知BSPMaster其当前正在运行的任务数以及剩余的空闲任务槽数目。BSPMaster会将各个 GroomServer汇报上来的状态信息缓存起来,作业调度器将会使用这些信息来为作业分配具体的执行节点
- 在进行任务分配之前,BSPMaster必须按照作业调度算法 选择作业。目前Hama只有一个先来先服务(FCFS)作业调度算 法,一旦BSPMaster选定了作业,就可以为作业分派具体的执行节点了。
- 任务运行:
- 作业状态更新:
- Hama作业通常是长时间运行的批处理作业,运行时间会从 几分钟到几个小时不等。由于时间较长,因此用户能够及时的从作 业的运行情况中得到反馈是至关重要的一个设计因素。在Hama的 设计中,每个作业和任务都有一个状态(status)信息用来表示当前作业或任务的状态(state)(如运行,成功完成,失败等),任 务执行进度以及其它的作业统计信息。
- 当BSPMaster收到作业最后一个任务完成的信号后(上文 提到的cleanup任务),它会把作业的状态信息设置为“成功”。然 后,在客户端调用作业信息时,将“成功”标识返回给客户端。客 户端在获知作业已成功运行完毕后,将在控制台上打印作业统计信 息。最后,BSPMaster将会清理该作业占用的相关资源,并通知GroomServer做清理工作。
- 作业调度策略:
- Hama计算框架是通过各节点间的消息发送来达成数据的一 致性的。一个作业在运行过程中发送消息的数量不仅会占用 GroomServer本地内存空间,过大的消息发送量还会影响集群的网络通信性能。Hama根据作业的消息发送量把作业分为如下两类:
- 消息密集型作业是指在作业在运行过程中产生的消息量大于作业本身输入的数据,这类作业不仅使得内存占用率增高,而且 会大量耗费集群网络带宽,很容易产生GroomServer节点过载。典 型的消息密集型作业包括网页排名(PageRank)及单源最短路径
(SSSP)等计算。 - CPU密集型作业也可称为非消息密集型作业,这类作业发送的消息量一般是小于其输入的数据量,在其运行过程中主要是在本地节点上进行运算,只通过少量的网络消息来达成各个计算节点所需的消息交换,甚至不需要发送网络消息。典型的CPU密集型作业如机器学习中k-meams聚类算法等。
- 消息密集型作业是指在作业在运行过程中产生的消息量大于作业本身输入的数据,这类作业不仅使得内存占用率增高,而且 会大量耗费集群网络带宽,很容易产生GroomServer节点过载。典 型的消息密集型作业包括网页排名(PageRank)及单源最短路径
- Hama计算框架是通过各节点间的消息发送来达成数据的一 致性的。一个作业在运行过程中发送消息的数量不仅会占用 GroomServer本地内存空间,过大的消息发送量还会影响集群的网络通信性能。Hama根据作业的消息发送量把作业分为如下两类:
- 作业管理:
- Hama的核心功能,主 要包含作业的提交、调度及 任务(Task)的分发和管理。 作业调度主要由BSPMaster 来完成,BSPMaster负责维 护每个Job的相关信息,将一 个Job的执行分解为多个Task,分配给各GroomServer。
- GroomServer负责执行 Task任务,并将执行状态等
参数返回给BSPMaster。
- FCFS作业调度器:
- 使用一个FIFO队列来管理用户提交的作业,在调度作业时,会 从等待队列中选取队首作业作为下一个执行的任务,并为该作业分配 具体执行节点。FCFS作业调度器的工作流程如图所示(图中各方法均省略了参数名),包含如下调度步骤:
- 步骤:
流计算模型
- 1998年通信领域的美国学者Monika R. Henziger 将流 数据定义为“只能以事先规定好的顺序被读取一次的数据的一个序列”。
- 静态数据处理过程:
- MR和Steam Computing的对比:
- MapReduce批处理(batch processing)模型是先将数据存储于文件系统或数据库,然后对存储系统中的静态数据进行处理计算,这一步骤并不是实时在线的,因此又被称为离线批处理模式。
- 流计算(stream computing)则是在数据到达同时即进行计算处理,计算结果也实时输出,原始输入数据可能保留,也可能丢弃。
- 分布式系统中常用有向非循环图(DAG)来表征计算流程或计算模型。如下图展示了链式任务组合,图中的不同颜色节点 表示不同阶段的计算任务(或计算对象),而单向箭头则表示了计算步骤的顺序和前后依赖关系。
流计算模式:
- Native Stream Processing System:基于数据读入顺序逐条进行处理,每一条数据到达即可得到即时处理(假设系统没有过载),简便易行,系统响应性好。但系统吞吐率(throughput)低,容错成本高和容易负载不均衡。
- Micro-batch Stream Processing System:将数据流先作预处理,打包成含多条数据的batch (批次)再传送给系统处理,系统吞吐率高,但延迟时间长。
流计算性能参数:
- 系统吞吐率:指单位时间内系统处理的数据量或完成的任务数。
- 对于C/S系统而言,服务器端的吞吐率是指服务器在单位时间内对所有的客户端完成的任务数。客户端则是单位时间完成该客户提交任务数目。讨论时一般指服务端。
系统响应延迟:
- 可分为如图三部分:去时传输时间、返回传输时间、服务器处理时间 delay time = network latency+server latency+network latency = 2 * network latency + server latency
- Native流计算系统:客户端系统延迟 delay time = 2Ln + Ls,服务器端系统吞吐率 throughput = 1/(delay time) = 1/(2Ln + Ls)。
- Micro-batch流计算系统(假设十条一个batch):客户端系统延迟 delay time = 2Ln + 10Ls,服务器端系统吞吐率 throughput = 10/(delay time) = 10/(2Ln + 10Ls) = 1/(0.2Ln + Ls)
- 比较就可以发现,如果Ln和Ls都不为0,则Micro-batch的吞吐率要高!
三种流计算模型:
Storm的Topology模型:
- Storm是一种Native Stream Processing System,即对流数据的 处理是基于每条数据进行,其并行 计算是基于由Spout(数据源)和 Bolt(处理单元)组成的有向拓扑图Topology来实现。
- 定义了并行计算的逻辑模型(或者称抽象模型),也即从功能和架构的角度设计了计算的步骤和流程。
其他的一些概念:
- Tuple:基本数据单元,可看作一组各种类型的值域组成的多元组:
- Spout: 数据源单元,负责将输入数据流转换成一个个Tuple, 发送给Bolt处理:
- Bolt:处理单元,负责读取上游传来的Tuple,向下游发送处理后的Tuple。
物理架构如何实现:
- 每个Worker运行一个Supervisor,每个Supervisor管理若干个Executor,每个Executor运行若干个Task.
- 如何对应?
- 逻辑架构:Topology / Tuple / Spout / Bolt
- 物理架构:Nimbus / Supervisor / Executor / Task
- 当一个Storm作业被提交时,同时需要提交预先设计的逻辑Topology。
- Topology里的Spout和Bolt的功能是靠worker节点上的Task来实现,一个Spout或Bolt的任务需要不同worker上的多个Task来并行完成
Spout 并行度 = 2 executors——绿色Bolt并行度 = 2 executors——黄色Bolt并行度 = 6 executors,由此可计算出各个组件所需要 的task数目(每个Green Bolt executor 并发2个task):task = 2 + 2*2 + 6 = 12
这12个线程分配到2个Worker, 每个Worker需运行12/2 = 6 个线程,分属于5个executor。
- Spark的DStream模型:
- Spark 流计算的核心概念是 Discretized Stream,由一组 RDD组成,每个RDD都包含了规定时间段(可设置)流入的数据。 Spark流计算可以基于单个RDD处理、也可基于时间window(包含多个RDD)进行处理。
- Spark的计算程序分为Driver(运行在Master节点上,也有一种模式运行在某一Worker节点上)和Executor(运行在Worker节点上)两部分:
- Driver 负责把应用程序的计算任务转化成有向非循环图(DAG)
- Executor 则负责完成worker节点上的计算和数据存储
- 在每个worker上, Executor针对一个个分发给它的数据partition再生成一 个个Task线程,完成并行计算
- 由于Spark将RDD划分的更小,因此可以进行细粒度的分配。例子:输入DStream需要按键值来进行处理,传统处理系统会把属于一个RDD的所有分区分配到一个worker(图左边所示),如果一个RDD的计算 量比别的RDD大许多,就会造成该节点成为性能瓶颈。而在Spark Streaming 中,属于一个RDD的分区会根据节点荷载状态动态地平衡分配到不同节点上 (图右边),一些节点会处理数量少但耗时长tasks,另一些节点处理数量多
但耗时短的tasks,使得整个系统负载更均衡。
- Samza的Partitioned Stream模型:
- LinkedIn 开源的一个分布式流处理系统,配合使用Kafka(分布式消息处理系统),基于逐条信息快速处理的Native流计算模型,强调低延迟快速处理。与Storm的模式相同,它是基于Kafka提供的分区数据流。
Flink:一个分布式流处理开源框架:
- 特性:
- 即使数据源是无序的或者晚到达的数据,也能保持结果准确性。
- 有状态并且容错,可以无缝的从失败中恢复,并可以保持 exactly-once。
- 3:大规模分布式
- 特性:
对比:
Storm计算架构
- 应用场景:
- 实施分析,在线机器学习,连续计算等
- 推荐系统:实时推荐。金融系统:实时分析股票。预警系统:采集数据判断预警阈值。网站统计:销量,流量统计。
- 计算架构特点:
- 分布式(具有水平拓展能力),实时性(流数据的快速响应处理,实验可以控制在毫秒级),数据规模(海量数据处理,数据可达TB甚至PB),容错性(系统级别容错和故障恢复),简便性(支持多种语言,添加支持只用实现Storm通信协议,简单)
- Storm计算流程:
Storm实时计算架构:
- Flume获取数据,Kafka临时保存数据,Strom计算数据,Redis保存数据。
Storm的逻辑架构:
数据模型 Tuple,数据流 Stream,数据源 Spout,处理单元 Bolt,分发策略 Stream Grouping,策略视图 Topology
多元组Tuple:Tuple是由一组各种类型的值域组成的多元组,所有的基本类型、 字符串以及字节数组都作为Tuple的值域类型,也可以使用用户自己定义的类型,它是Storm的基本数据单元:
- Tuple格式:
- Tuple数据结构:
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); }
数据流Stream:Stream是一个不间断的无界的连续Tuple序列,是对流数据的抽象。
数据源Spout:负责将外部输入数据流转换成Tuple序列
处理单元Bolt:Bolt将所有的消息处理逻辑都封装在执行程序里面, 可执行过滤、聚合、查询数据库等操作,它接收输入的Tuple流并产生输出的新Tuple流
消息分发策略 Stream Grouping:Tuple序列从上游Bolt到下游Bolt其多个并发Task的分组分发方式。
- 随机分组,按字段分组,按字段分组,广播发送,全局分组,不分组,直接分组。
逻辑视图 Topology:Topology是一个由Spout源,Bolt节点,Tuple流,Stream Grouping分发方式组成的一个有向图,代表了一个Storm作业的逻辑架构。
- Storm对数据的处理逻辑与算法封装在Bolt里,那么一个Storm作业的计算流程就封装在Topology里。因此,一个设计好的Topology可以提交到Storm集群去执行。
- Topology只是一个Storm作业 流程的逻辑设计,真正要实现这个逻辑设计,还需要Storm的系统架构或物理模型来支撑。
系统架构:
Storm也采用了主从,Nimbus守护进程类似于Hadoop的JobTracker,负责任务分发和故障检测,通过Zookeeper管理众多的工作节点。每个工作节点跑一个Supervisor守护进程,监听本地节点状态,根据Nimbus的指令 在必要时启动和关闭本节点的工作进程。
- 系统架构(物理视图)组件:
Nimbus:主节点上,是整个流计算集群的控制核心,Topology的提交。运行状体监控,LB和任务重新分配。分配任务包括Topology代码所在路径以及Worker,Executor和Task的信息。
Zookeeper:Hadoop平台提供,是整个集群状态同步协调的核心组件。收集Supervisor,Worker和Executor的心跳信息,Topology出错或者有新的Topology提交,会同步到Zookeeper
Supervisor:运行在工作节点(称为node)上的控制程序,监听本地机器的状态,接受 Nimbus指令管理本地的Worker进程。Nimbus和Supervisor都具有fail-fast(并发线程快速报错)和无状态的特点。
Worker:运行在node上的工作进程。Worker由node + port唯一确定,一个 node上可以有多个Worker进程运行,一个Worker内部可执行多个Task。Worker还负责与远程node的通信。
Executor:提供Task运行时的容器,执行Task的处理逻辑。一个或多个Executor 实例可以运行在一个Worker中,一个或多个Task线程也可运行在一个Executor中。
Task:逻辑组件Spout/Bolt在运行时的实体,也是Executor内并行运行的计算任务。一个Spout/Bolt在运行时可能对应一个或多个Tasks,并行运行在不同节点上。Task数目可在Topology中配置,一旦设定不能改变。
一个运行中的 Topology 由集群中的多个 Worker 进程组成的,在默认情况下,每个 Worker 进程默认启动一个 Executor 线程,在默认情况下,每个 Executor 默认启动一个 Task 线程,Task 是组成 Component 的代码单元。
- 其他注意点:
- 1 个 Worker 进程执行的是 1 个 Topology 的子集,不会出现 1 个 Worker 为多个 Topology 服务的情况,因此 1 个运行中的 Topology 就是由集群中多台物理机上的多个 Worker 进程组成的。
- 1 个 Worker 进程会启动 1 个或多个 Executor 线程来执行 1 个 Topology 的 Component(组件,即 Spout 或 Bolt)
- Executor 是 1 个被 Worker 进程启动的单独线程。每个 Executor 会运行 1 个 Component 中的一个或者多个 Task
- Task 是组成 Component 的代码单元。Topology 启动后,1 个 Component 的 Task 数目是固定不变的,但该 Component 使用的 Executor 线程数可以动态调整(例如:1 个 Executor 线程可以执行该 Component 的 1 个或多个 Task 实例)。这意味着,对于 1 个 Component 来说,#threads<=#tasks(线程数小于等于 Task 数目)这样的情况是存在 的。默认情况下 Task 的数目等于 Executor 线程数,即 1 个 Executor 线程只运行 1 个 Task。
- 工作机制:
非本地模式下,客户端通过 Thrift调用Nimbus接口来上传代码到Nimbus并启动提交操作。Nimbus进行任务分配,并将信息同步到Zookeeper ——> Supervisor定期获取任务分配信息,如果Topology代码缺失,会从Nimbus下载代码,并根据任务分配信息同步Worker ——> Worker根据分配的tasks信息,启动多个Executor线程,同时实例化 Spout,Bolt,Acker等组件,待所有connections(Worker和其它机器通讯的网络连接)启动完毕,此Storm系统即进入工作状态。
Storm的模式:本地模式和分布式模式 -> 本地模式:Storm用一个进程里面的线程来模拟所有的Spout和Bolt。本地模式只对开发测试来说有用。分布式模式:Storm以多进程多线程模式运行在一个集群上。当提交Topology给Nimbus的时候, 同时就提交了Topology的代码。Nimbus负责分发你的代码并且负责给你的topolgoy分配工作进程,如果一个工作进程failed,Nimbus会把它重新分配到其它节点。
- Storm的ACK机制需要解决的问题:跟踪tuple stream的状态,判断每一个是否在Bolt成功完成处理。如果结束,通知对应的task。如果处理失败,要提供处理办法,Acker在内存中,但是负荷太重,消耗内存。
哈希映射 Acker_id = Spout_id MOD(#Ackers)
- Acker01: spout_1 (task 0001, task 0002, task 0003)
- Acker02: spout_2 (task 0004)
- Acker消息机制:
- 算法跟踪的三个环节:
- Spout创建新的tuple给Acker发送消息
- Bolt中tuple被ack的时候给Acker发送消息
- Acker跟踪每一个tuple stream, 根据接收到的消息做按位异或运算,更新自己的ack-val
- Acker跟踪算法维护的数据结构:
{root-id {:spout-task task-id :val ack-val :failed bool-val …} }
- 算法跟踪的三个环节:
- Acker收到的数值:
计数器结构:
Tuple Tree的构成:
前、后 tuples的锚定(anchor)
Spout发出的tuple都带有一个64-bit随机生成的 msgId:
SpoutOutputCollector.emit (new Values("value1","value2"), msgId);
当Bolt向下游输出衍生的tuple时,调用如下方法建立起输入tuple和输出tuple 的关联关系,这称之为锚定(anchor):
- BoltOutputCollector.emit (in-tuple, new Values(word)); //anchor word to in-tuple
emit()建立的tuple关联关系在跟踪这个tuple的Acker那里会构成一张DAG图。 Bolt接收输入tuple进行处理,处理成功则向Acker发送Ack确认;失败则发送fail报错。这样Acker可以跟踪这张Tuple Tree图里每一个tuple的完成状态。
消息发送ACK机制:
- Storm可靠性要求发出的每一个tuple都会完成处理过程,其含义是这个 tuple以及由这个tuple所产生的所有后续的子tuples都被成功处理。由于Storm 是一个实时处理系统,任何一个消息tuple和其子tuples如果没有在设定的 timeout时限内完成处理,那这个消息就失败了,因此Storm需要一种ACK (Acknowledgement)机制来保证每个tuple在规定时限内得到即时处理。这 个timeout时限可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来
设定,Timeout的默认时长为30秒。
- Storm可靠性要求发出的每一个tuple都会完成处理过程,其含义是这个 tuple以及由这个tuple所产生的所有后续的子tuples都被成功处理。由于Storm 是一个实时处理系统,任何一个消息tuple和其子tuples如果没有在设定的 timeout时限内完成处理,那这个消息就失败了,因此Storm需要一种ACK (Acknowledgement)机制来保证每个tuple在规定时限内得到即时处理。这 个timeout时限可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来
Tuple Tree的状态追踪:
- 以如图的Tuple Tree为例,输入tuple A在Bolt处完成了处理,并向下游发 送了2个衍生tuples B和C,在Bolt向跟踪的Acker报告了Ack后,Tuple Tree就只包含了tuples B和C(tuple A打红X表示它已不在当前状态的Tuple Tree中)。
- 然后tuple C流转到下一个Bolt,被处理完后又衍生了tuples D和E。该Bolt 向Acker确认已处理完tuple C,于是C被移出Tuple Tree,当前状态的Tuple Tree变成只包含B,D,E 。。。这一过程将持续进行,直到没有新的tuple加
入这个Tuple Tree,而树中所有的tuples都完成了处理移出了Tuple Tree。
Storm作业的每一个Topology中都包含一个Acker组件。Acker的 任务就是跟踪从Spout发出的每一个tuple及其子tuples的处理完成情 况,实际上Acker是以一种特殊Task运行,可以通过Config.setNumAckers (conf, ackerParal)设置Acker Task的数目大于
1(默认是1),Acker还可用于Spout限流作用:为了避免Spout发送数据太快而 Bolt来不及处理。当Spout有等于或超过pending值的tuples没有收到 Ack或fail了,则Spout跳过nextTuple()方法不生成下一个新tuple,从而限制Spout的发送速度。
- Acker算法:
前面提到,一个Spout发出的tuple的Tuple Tree构成和更新是由处理该 tuple的各个Bolts在流转过程中完成,跟踪这个tuple及其衍生tuples(它们 构成了Tuple Tree)的Acker程序最终基于以下算法判断Tuple Tree是否处理完毕(即树中所有的节点都被Acked),也即判断该tuple处理是否结束:
- 1)当Spout生成一个新tuple时,会向Acker发送如下一条信息通知Acker { spout-tuple-id {:spout-task task-id : val ack-val } }
这里,spout-tuple-id是这条新tuple随机生成的64-bit ID task-id是产生这条tuple的Spout ID,Spout可能有多个task,每个
task都会被分配一个唯一的taskId ack-val:Acker使用的64-bit的校验值计数器,初始值为0收到Spout发来的初始tuple消息后,Acker首先将ack-val(此时为0)与初始tuple的msgId做一个XOR(exclusive OR)运算(下表),并将结果更 新Acker所持的目前ack-val值:ack-val = (ack-val) XOR (spout-tuple-id); - 2)Bolt处理完输入的tuple,若创建了新的衍生tuples向下游发送,在向Acker 发送消息确认输入tuple完成时,它会先把输入tuple的msgId与所有衍生tuples 的msgId(也是64-bit的全新ID)作XOR运算,然后把结果tmp-ack-val包含在 发送的Ack消息中,消息格式是 (spout-tuple-id, tmp-ack-val)。Acker收到每个Bolt发来的Ack消息,都会执行如下运算: ack-val = (ack-val) XOR (tmp-ack-val);
所以Acker所持的ack-val所含值总是目前Tuple Tree中所有tuples的msgId的XOR运算值。
3)当Acker收到一个Ack消息使ack-val = 0时,该条tuple的处理结束, 因为:**(ack-val) XOR (tmp-ack-val) = 0**,意味着ack-val的值与tmp-ack-val相同(只有两个值完全相同时XOR 的运算结果才为0)。这意味着整个Tuple Tree在规定时间内timeout 再无新的tuple产生,整个运算结束。有无可能由于两个衍生tuple的ID值碰巧相同,造成ack-val在Tuple,Tree处理完之前就变成0?由于衍生tuple也是64-bit的随机数,两个 64-bit随机生成的ID值完全一样的概率非常低,几乎可忽略不计,因 此在Tuple Tree处理完之前ack-val为0的概率非常小。
4)根据最后的tuple处理成功或失败结果,Acker会调用对应的Spout 的ack()或fail ()方法通知Spout结果,如果用户重写了ack ()和fail ()方
法,Storm就会按用户的逻辑来进行处理。
- Acker算例:
下面我们以下图的Topology Tree为例讲解Acker算法流程。该 Topology包含1个Spout,3个Bolts,流程步骤如下:
步骤一:Spout读入数据后生成了2个tuples(msgId分别为1001和 1010),通知Acker;
步骤二:tuple 1001流入Bolt1,处理完后产生了新的tuple 1110, Bolt1向Acker发送了tuple 1001的Ack; tuple 1010流入Bolt2,处理完后产生了新的tuple 1111,Bolt2向Acker发送了tuple 1010的Ack;
步骤三:两个tuples 1110,1111流向Bolt3,处理完后不再有新tuple 产生,Bolt3向Acker发送了处理结果的Ack。
ACK关闭:
- 在某些场景下我们不希望使用ACK可靠性机制,或者对一部分 流数据不需要保证处理成功,可以用如下方式关闭或部分关闭ACK功能:
- 把Config.TOPOLOGY_ACKERS设置成0。在这种情况下,Storm会在Spout发射一个tuple之后马上调用Spout的ack ()方法,这 样这个Tuple整个的Tuple Tree不会被跟踪;
- 也可在Spout发射tuple的时候不设定msgId来达到不跟踪这个 tuple的目的,这种发射方式是一种不可靠的发射
- 如果对于一个Tuple Tree的某一部分tuples是否处理成功不关注, 可以在Bolt发射这些Tuple的时候不锚定它们。这样这部分tuples就不会加入到Tuple Tree里面,也就不会被跟踪了。
- 在某些场景下我们不希望使用ACK可靠性机制,或者对一部分 流数据不需要保证处理成功,可以用如下方式关闭或部分关闭ACK功能:
容错机制:
- Storm从任务(线程)、组件(进程)、节点(系统)三个层面 设计了系统容错机制,尽可能实现一种可靠的服务。
- 任务级容错(Task-level):
- 如果Bolt Task线程崩溃,导致流转到该Bolt的tuple未被应答。 此时Acker会将所有与此Bolt Task关联的tuples都设置为为超时失败,并调用对应的Spout的fail ()方法进行后续处理。
- 如果Acker Task本身失效,Storm会判定它在失败之前维护的所有tuples都因超时而失败,对应Spout的fail ()方法将被调用。
- 如果Spout任务失败,在这种情况下,与Spout对接的外部设备 (如MQ队列)负责消息的完整性。例如当客户端异常时,外部 kestrel队列会将处于pending状态的所有消息重新放回队列中。另外, Storm记录有Spout成功处理的进度,当Spout任务重启时,会继续从以前的成功点开始。
- Bolt故障(Process):
- 如果一个Worker进程失败,每个Worker包含的数个Bolt (或Spout) Tasks也失效了。负责监控此Worker的Supervisor会尝试在本机重启它,如 果在启动多次仍然失败,它将无法发送心跳信息到Nimbus,Nimbus将判定此Worker失效,将在另一台机器上重新分配Worker并启动。
- 如果Supervisor失败,由于Supervisor是无状态的(所有的状态都保存 在Zookeeper或者磁盘上)和fail-fast(每当遇到任何意外的情况,进程自 动毁灭),因此Supervisor的失败不会影响当前正在运行的任务,只要及时将Supervisor重新启动即可。
- 如果Nimbus失败,由于Nimbus也是无状态和fail-fast的,因此Nimbus 的失败不会影响当前正在运行的任务,只是无法提交新的Topology,只需及时将它重启即可。
- 集群节点故障(Node):
- 如果Storm集群节点发生故障。此时Nimbus会将此节点上所有正在运行 的任务转移到其他可用的节点上运行
- 若是Zookeeper集群节点故障,Zookeeper自身有容错机制,可以保证少于半数的机器宕机系统仍可正常运行。
WordCount算例:
这里注意,最右边那个是fail,如果某个环节失败了,就会导致spout重新发送一次,重新走一次嗷!!!