apache giraph 调研文档
[TOC]
1. 背景介绍
Apache Giraph是一个为高可扩展性而构建的迭代图形处理系统。Giraph起源于Pregel的开源对应架构,Pregel是谷歌开发的图形处理架构,并在2010年的一篇论文中进行了描述。这两个系统的灵感都来自Leslie Valiant引入的分布式计算的批量同步并行模型。Giraph在基本Pregel模型之外添加了几个功能,包括主计算、分片聚合器、面向边缘的输入、核心外计算等。
2. giraph 项目结构
下面是项目目录中的主要目录介绍。
1 | - giraph #项目根目录 |
3. giraph 编译调试
3.1 拉取代码
1 | git clone git@github.com:apache/giraph.git |
3.2 编译打包
1 | mvn -DskipTests=true package -P hadoop_2,hadoop_trunk |
3.3 调试
本次调研只描述单机通过giraph-examples中的单元测试进行调试。
- idea maven 的配置文件勾选hadoop_2,hadoop_trunk
- 以org.apache.giraph.examples.MaxComputationTest为例:
- 如需调试任务提交流程,可直接在单元测试方法内打断点。
- 如需调试计算流程,可在org.apache.giraph.graph.GraphMapper中的run方法打断点。
- 如需调试任务启动流程,可在org.apache.giraph.graph.GraphMapper中的setup方法打断点。
4. giraph 计算流程
4.1 zookeeper 简单介绍
giraph 利用来zookeeper来进行选主以及计算的同步,有必要对zookeeper进行简单介绍来更易于理解giraph的计算流程。
(下文zk等同于zookeeper)
zk的数据结构是一个具有层次的数据结构,类似于文件系统。
graph LR A["/"] A-->B["/NameService"] B-->B1["/server1"] B-->B2["/server2"] B-->B3["/server3"] A-->C["/conf"] A-->D["/Apps"] D-->D1["/app1"] D-->D2["/app2"] D-->D3["/app3"] D3-->E["/subApp1"] D3-->E1["/subApp2"]
这种数据结构有如下特点:
每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/server1
znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录,
znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
znode 的目录名可以自动编号,如 app1 已经存在,再创建的话,将会自动命名为 app2znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的
zk的应用举例:
统一命名服务(NameService):
分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住。
Name Service 已经是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。
配置管理:
可以将配置信息保存在zk的某个节点中,当配置信息发生变化时,每台应用节点都会收到zk的通知,便可从zk获取最新的配置信息到本节点。
Master选举:
分布式系统一个重要的模式就是主从模式 (Master/Salves),Zookeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 Zookeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。
分布式锁:
分布式系统的所有服务节点可以竞争性地去创建同一个临时 ZNode,由于 zl不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,此时可以认为该节点获得了锁。其他没有获得锁的服务节点通过在该 ZNode 上注册监听,从而当锁释放时再去竞争获得锁。锁的释放情况有以下两种:
- 当正常执行完业务逻辑后,客户端主动将临时 ZNode 删除,此时锁被释放;
- 当获得锁的客户端发生宕机时,临时 ZNode 会被自动删除,此时认为锁已经释放。
当锁被释放后,其他服务节点则再次去竞争性地进行创建,但每次都只有一个服务节点能够获取到锁,这就是排他锁
集群管理:
- 如可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
- 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报。
- 通过数据的订阅和发布功能,zk 还能对分布式系统进行模块的解耦和任务的调度。
- 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。
4.2 Giraph中的MapReduce
giraph 利用来MapReduce的Mapper接口(org.apache.giraph.graph.GraphMapper),但是并没有用Mapper接口最重要的map方法,而是重写了setup、cleanup和run方法,giraph只是通过mr来利用Hadoop yarn集群中的计算节点,下面简单介绍一下Mapper接口的setup、cleanup和run方法。
setup: 此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。
giraph利用setup来初始化GraphTaskManager。
run:此方法是MapReduce中mapper task调用的方法,可以看到在这个方法默认实现中一开始调用了setup方法,循环中调用了map方法,最后调用了cleanup方法。
giraph中改写了这个方法,没有调用map方法,调用了setup中初始化的GraphTaskManager的execute() 方法,并且在计算异常时主动删除zk节点,以更快的通知其它节点。
cleanup:此方法在Map任务的最后阶段调用,执行清理操作。
MapReduce任务需要通过代码手动组装任务并提交到yarn集群中,在giraph中通过org.apache.giraph.GiraphRunner提交任务到yarn集群,具体的执行逻辑在org.apache.giraph.job.GiraphJob#run方法里,这里有一个点简单说一下:
在GraphMapper的run方法中我们可以看到
1
2
3
4
5
6
7 > //....
> while (context.nextKeyValue()) {
> graphTaskManager.execute();
> }
> //....
>
>
是在一个while循环里调用execute方法,其实这个循环只会执行一次,这说明context里面只读取到了一个值,这个读取的值通过org.apache.giraph.job.GiraphJob#run方法里的
1
2 > submittedJob.setInputFormatClass(BspInputFormat.class);
>
BspInputFormat.class决定,通过这个类里的createRecordReader决定,这里只返回了一个BspRecordReader,可以看到这个对象里面只会返回一对值,所以这个循环只会出现一次。
另外介绍一下Map中Contex contex的context.progress()方法,这个方法是用来报告自己的状态
4.3 BSP
4.3.1 master
master 的运行逻辑在org.apache.giraph.master.MasterThread的run()方法里。
graph LR A["检查工人状态"]-->B["拆分数据"] B-->C["执行超步循环"]
超步循环最核心的逻辑在 org.apache.giraph.master.BspServiceMaster#coordinateSuperstep方法里
graph TB A["assignPartitionOwners()
1,生成partitionOwners集合
2,zk中创建vertexExchangePath节点
3,向每个work发送分区信息"] A-->B["通过调用barrierOnWorkerList方法来检查work是否写完检查点"] B-->C["发送聚合器,
如果是第一次则初始化聚合器"] C-->D["通过调用barrierOnWorkerList方法来检查work是否完成工作"] D-->E["收集所有聚合值计算并存储"] E-->F["判断计算是否需要停止"] F-->G["创建完成zk节点,
把聚合统计信息写入节点"]
barrierOnWorkerList 原理:创建一个zk中的znode节点,查找该节点的子节点数量是否等于work数量,不想等则进入线程等待状态,后面只要有work在改znode下创建子节点便会触发继续判断是否相等。
4.3.2 client
client的运行逻辑在 org.apache.giraph.graph.GraphTaskManager#execute方法里
execute方法里的while循环便是work超步的循环。
serviceWorker.startSuperstep() 超步计算开始前的准备工作
graph LR; A["prepareSuperstep()
处理上一步的消息"] B["registerHealth
通过向zk注册节点
表明自己的健康状况"] A-->B B-->C["等待分区数据的分配完成"] C-->D["从上一个超步拿到聚合值
通过getSuperstep()
从zk获取"]serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners)
graph LR A["updatePartitionOwners
获取最新的分区信息"] A-->B["sendWorkerPartitions
将分区发送给所有work"] B-->C["getApplicationAttempt
在zk创建一个节点,代表当前节点完成"] C-->D["等待依赖节点完成
通过检查zk vertexExchangePath下的节点"]
processGraphPartitions 执行每一个超步具体的计算
生成 ComputeCallable来做具体的计算工作,通过里面的call来执行具体的计算逻辑
graph TB A["初始化计算线程"] A-->B["获取写入顶点的接口"] B-->C["创建计算的对象
Computation的实例"] C-->D["初始化计算对象和执行计算对象前置方法"] D-->E["获取分区存储、OutOfCoreEngine、taskManager"] E-->F["获取一个分区partition"] F-->G{"partion是否为空"} G-->|Yes| GOK["退出循环,执行计算后的后置操作"] G-->|No| GNO["处理分区数据变化"] GNO-->H1["获取分区的点集合"] H1-->H2{"是否忽视现有点"} H2-->|Yes|H3["遍历分区内目标点的集合
1,获取点的消息
2,调用计算对象的compute计算
3,去除计算过的消息"] H2-->|No|H4["遍历分区的所有点的集合
1,获取点的消息
2,依据点的状态和是否有消息判断是否计算
3,处理边的变化,保存点,去除计算过的消息"] H3-->H5["清理分区"] H4-->H5 subgraph "computePartition 计算分区数据" H1 H2 H3 H4 H5 end H5-->J["一些指标的统计"] J-->F GOK-->L["flush 消息"]
completeSuperstepAndCollectStats 方法里处理每个超步的计算结束。
org.apache.giraph.worker.BspServiceWorker#finishSuperstep
graph LR A["waitForRequestsToFinish
等待所有未完成的请求完成"] B["globalCommHandler.finishSuperste
计算聚合器的值
发送结果给master"] A-->B B-->C["writeFinshedSuperstepInfoToZK
将完成的超步信息写入到zk"] C-->D["waitForOtherWorkers
通过等待master创建完成zk完成标志节点"]
4.4 pregel
4.4.1 pregel 简述
Pregel的计算过程是由一系列被称为“超步”的迭代组成的。
在每个超步中,每个顶点上面都会并行执行用户自定义的函数,该函数描述了一个顶点V在一个超步S中需要执行的操作。
该函数可以读取前一个超步(S-1)中其他顶点发送给顶点V的消息,执行相应计算后,修改顶点V及其出射边的状态,然后沿着顶点V的出射边发送消息给其他顶点,而且,一个消息可能经过多条边的传递后被发送到任意已知ID的目标顶点上去。
graph LR title[顶点之间的消息传递] style title fill:#FFF,stroke:#FFF A["A compute()计算"] B["B compute()计算"] D["C compute()计算"] A---B A---D A.->|"消息"|B B.->|"消息"|A A.->|"消息"|D D.->|"消息"|A
这些消息将会在下一个超步(S+1)中被目标顶点接收,然后像上述过程一样开始下一个超步(S+1)的迭代过程。
在Pregel计算过程中,一个算法什么时候可以结束,是由所有顶点的状态决定的。
在第0个超步,所有顶点处于活跃状态。
当一个顶点不需要继续执行进一步的计算时,就会把自己的状态设置为“停机”,进入非活跃状态。
当一个处于非活跃状态的顶点收到来自其他顶点的消息时,Pregel计算框架必须根据条件判断来决定是否将其显式唤醒进入活跃状态。
当图中所有的顶点都已经标识其自身达到“非活跃(inactive)”状态,并且没有消息在传送的时候,算法就可以停止运行。
graph LR title[状态变化] style title fill:#FFF,stroke:#FFF A(("活跃"))--"不需要执行进一步计算"-->B(("不活跃")) B--"收到消息后"-->A
4.4.2 giraph 代码例子
最根本的是org.apache.giraph.graph.Computation接口,最重要的是实现里面的compute方法。
compute方法会在4.3.2 client 中的ComputeCallable的call来调用
compute的方法实现,例如求连通分量中的实现:
1 |
|