apache giraph 调研文档

apache giraph 调研文档

[TOC]

1. 背景介绍

Apache Giraph是一个为高可扩展性而构建的迭代图形处理系统。Giraph起源于Pregel的开源对应架构,Pregel是谷歌开发的图形处理架构,并在2010年的一篇论文中进行了描述。这两个系统的灵感都来自Leslie Valiant引入的分布式计算的批量同步并行模型。Giraph在基本Pregel模型之外添加了几个功能,包括主计算、分片聚合器、面向边缘的输入、核心外计算等。
 

2. giraph 项目结构

下面是项目目录中的主要目录介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
- giraph #项目根目录
- giraph-block-app #giraph block api 的实现,不在本次调研范围内
- giraph-core #giraph pregel计算的核心实现,本次调研主要关注范围
-src/main/java
- com.yammer.metrics.core #一些空的指标实现
- org.apache.giraph
├── Algorithm.java #内置算法的名字注释
├── GiraphRunner.java #提交在hadoop上运行的任务入口
├── aggregators #giraph的聚合器相关
├── benchmark #基准测试
├── bsp #bsp的对象包
├── combiner #消息的Combiner包
├── comm #通信相关的包
├── conf #配置相关的包
├── counters #处理hadoop记数的包
├── edge #图的边的包
├── factories #工厂包,用于创建各种类型
├── function #函数接口包,
├── graph #与图形相关的包,关键的计算接口Computation也在这里
├── io #io相关的包
├── job #job启动中涉及的包
├── jython #jython集成包
├── mapping #主要是mappingstore
├── master #与master有关的对象包
├── metrics #指标相关的包
├── ooc #核心外围的包
├── partition #分区相关的对象包
├── reducers #reduce 相关的包
├── scripting #giraph 脚本相关的包
├── time #time相关的包
├── types #类型转换相关的包
├── utils #工具类
├── worker #计算节点work相关的包
├── writable #kryo和tuple相关的包
├── yarn #hadoop yarn相关的包
└── zk #zookeeper相关的包
- giraph-examples 一些例子代码,和可供调试的单元测试

3. giraph 编译调试

3.1 拉取代码

1
git clone git@github.com:apache/giraph.git

3.2 编译打包

1
mvn -DskipTests=true package -P hadoop_2,hadoop_trunk

3.3 调试

本次调研只描述单机通过giraph-examples中的单元测试进行调试。

  • idea maven 的配置文件勾选hadoop_2,hadoop_trunk
  • org.apache.giraph.examples.MaxComputationTest为例:
    • 如需调试任务提交流程,可直接在单元测试方法内打断点。
    • 如需调试计算流程,可在org.apache.giraph.graph.GraphMapper中的run方法打断点。
    • 如需调试任务启动流程,可在org.apache.giraph.graph.GraphMapper中的setup方法打断点。

4. giraph 计算流程

4.1 zookeeper 简单介绍

giraph 利用来zookeeper来进行选主以及计算的同步,有必要对zookeeper进行简单介绍来更易于理解giraph的计算流程。

(下文zk等同于zookeeper)

zk的数据结构是一个具有层次的数据结构,类似于文件系统。

graph LR
    A["/"]
    A-->B["/NameService"]
    B-->B1["/server1"]
    B-->B2["/server2"]
    B-->B3["/server3"]
    A-->C["/conf"]
    A-->D["/Apps"]
    D-->D1["/app1"]
    D-->D2["/app2"]
    D-->D3["/app3"]
    D3-->E["/subApp1"]
    D3-->E1["/subApp2"]

这种数据结构有如下特点:

  • 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/server1

  • znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录,

  • znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据

  • znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
    znode 的目录名可以自动编号,如 app1 已经存在,再创建的话,将会自动命名为 app2

  • znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的

zk的应用举例:

  • 统一命名服务(NameService):

    分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住。

    Name Service 已经是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。

  • 配置管理:

    可以将配置信息保存在zk的某个节点中,当配置信息发生变化时,每台应用节点都会收到zk的通知,便可从zk获取最新的配置信息到本节点。

  • Master选举:

    分布式系统一个重要的模式就是主从模式 (Master/Salves),Zookeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 Zookeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。

  • 分布式锁:

    分布式系统的所有服务节点可以竞争性地去创建同一个临时 ZNode,由于 zl不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,此时可以认为该节点获得了锁。其他没有获得锁的服务节点通过在该 ZNode 上注册监听,从而当锁释放时再去竞争获得锁。锁的释放情况有以下两种:

    • 当正常执行完业务逻辑后,客户端主动将临时 ZNode 删除,此时锁被释放;
    • 当获得锁的客户端发生宕机时,临时 ZNode 会被自动删除,此时认为锁已经释放。

    当锁被释放后,其他服务节点则再次去竞争性地进行创建,但每次都只有一个服务节点能够获取到锁,这就是排他锁

  • 集群管理:

    • 如可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
    • 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报。
    • 通过数据的订阅和发布功能,zk 还能对分布式系统进行模块的解耦和任务的调度。
    • 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。

4.2 Giraph中的MapReduce

giraph 利用来MapReduce的Mapper接口(org.apache.giraph.graph.GraphMapper),但是并没有用Mapper接口最重要的map方法,而是重写了setup、cleanup和run方法,giraph只是通过mr来利用Hadoop yarn集群中的计算节点,下面简单介绍一下Mapper接口的setup、cleanup和run方法。

  • setup: 此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。

    giraph利用setup来初始化GraphTaskManager。

  • run:此方法是MapReduce中mapper task调用的方法,可以看到在这个方法默认实现中一开始调用了setup方法,循环中调用了map方法,最后调用了cleanup方法。

    giraph中改写了这个方法,没有调用map方法,调用了setup中初始化的GraphTaskManager的execute() 方法,并且在计算异常时主动删除zk节点,以更快的通知其它节点。

  • cleanup:此方法在Map任务的最后阶段调用,执行清理操作。

MapReduce任务需要通过代码手动组装任务并提交到yarn集群中,在giraph中通过org.apache.giraph.GiraphRunner提交任务到yarn集群,具体的执行逻辑在org.apache.giraph.job.GiraphJob#run方法里,这里有一个点简单说一下:

在GraphMapper的run方法中我们可以看到

1
2
3
4
5
6
7
> //....
> while (context.nextKeyValue()) {
> graphTaskManager.execute();
> }
> //....
>
>

是在一个while循环里调用execute方法,其实这个循环只会执行一次,这说明context里面只读取到了一个值,这个读取的值通过org.apache.giraph.job.GiraphJob#run方法里的

1
2
> submittedJob.setInputFormatClass(BspInputFormat.class);
>

BspInputFormat.class决定,通过这个类里的createRecordReader决定,这里只返回了一个BspRecordReader,可以看到这个对象里面只会返回一对值,所以这个循环只会出现一次。

另外介绍一下Map中Contex contex的context.progress()方法,这个方法是用来报告自己的状态

4.3 BSP

4.3.1 master

master 的运行逻辑在org.apache.giraph.master.MasterThread的run()方法里。

graph LR
    A["检查工人状态"]-->B["拆分数据"]
    B-->C["执行超步循环"]

超步循环最核心的逻辑在 org.apache.giraph.master.BspServiceMaster#coordinateSuperstep方法里

graph TB
    A["assignPartitionOwners() 
1,生成partitionOwners集合
2,zk中创建vertexExchangePath节点
3,向每个work发送分区信息"] A-->B["通过调用barrierOnWorkerList方法来检查work是否写完检查点"] B-->C["发送聚合器,
如果是第一次则初始化聚合器"] C-->D["通过调用barrierOnWorkerList方法来检查work是否完成工作"] D-->E["收集所有聚合值计算并存储"] E-->F["判断计算是否需要停止"] F-->G["创建完成zk节点,
把聚合统计信息写入节点"]

barrierOnWorkerList 原理:创建一个zk中的znode节点,查找该节点的子节点数量是否等于work数量,不想等则进入线程等待状态,后面只要有work在改znode下创建子节点便会触发继续判断是否相等。

4.3.2 client

client的运行逻辑在 org.apache.giraph.graph.GraphTaskManager#execute方法里

execute方法里的while循环便是work超步的循环。

  • serviceWorker.startSuperstep() 超步计算开始前的准备工作

      graph LR;
        A["prepareSuperstep()
    处理上一步的消息"] B["registerHealth
    通过向zk注册节点
    表明自己的健康状况"] A-->B B-->C["等待分区数据的分配完成"] C-->D["从上一个超步拿到聚合值
    通过getSuperstep()
    从zk获取"]
  • serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners)

      graph LR 
        A["updatePartitionOwners
    获取最新的分区信息"] A-->B["sendWorkerPartitions
    将分区发送给所有work"] B-->C["getApplicationAttempt
    在zk创建一个节点,代表当前节点完成"] C-->D["等待依赖节点完成
    通过检查zk vertexExchangePath下的节点"]
  • processGraphPartitions 执行每一个超步具体的计算

    • 生成 ComputeCallable来做具体的计算工作,通过里面的call来执行具体的计算逻辑

          graph TB
      A["初始化计算线程"]
      A-->B["获取写入顶点的接口"]
      B-->C["创建计算的对象
      Computation的实例"] C-->D["初始化计算对象和执行计算对象前置方法"] D-->E["获取分区存储、OutOfCoreEngine、taskManager"] E-->F["获取一个分区partition"] F-->G{"partion是否为空"} G-->|Yes| GOK["退出循环,执行计算后的后置操作"] G-->|No| GNO["处理分区数据变化"] GNO-->H1["获取分区的点集合"] H1-->H2{"是否忽视现有点"} H2-->|Yes|H3["遍历分区内目标点的集合
      1,获取点的消息
      2,调用计算对象的compute计算
      3,去除计算过的消息"] H2-->|No|H4["遍历分区的所有点的集合
      1,获取点的消息
      2,依据点的状态和是否有消息判断是否计算
      3,处理边的变化,保存点,去除计算过的消息"] H3-->H5["清理分区"] H4-->H5 subgraph "computePartition 计算分区数据" H1 H2 H3 H4 H5 end H5-->J["一些指标的统计"] J-->F GOK-->L["flush 消息"]
  • completeSuperstepAndCollectStats 方法里处理每个超步的计算结束。

    • org.apache.giraph.worker.BspServiceWorker#finishSuperstep

          graph LR
      A["waitForRequestsToFinish 
      等待所有未完成的请求完成"] B["globalCommHandler.finishSuperste
      计算聚合器的值
      发送结果给master"] A-->B B-->C["writeFinshedSuperstepInfoToZK
      将完成的超步信息写入到zk"] C-->D["waitForOtherWorkers
      通过等待master创建完成zk完成标志节点"]

4.4 pregel

4.4.1 pregel 简述

Pregel的计算过程是由一系列被称为“超步”的迭代组成的。

在每个超步中,每个顶点上面都会并行执行用户自定义的函数,该函数描述了一个顶点V在一个超步S中需要执行的操作。

该函数可以读取前一个超步(S-1)中其他顶点发送给顶点V的消息,执行相应计算后,修改顶点V及其出射边的状态,然后沿着顶点V的出射边发送消息给其他顶点,而且,一个消息可能经过多条边的传递后被发送到任意已知ID的目标顶点上去。

graph LR
title[顶点之间的消息传递]
style title fill:#FFF,stroke:#FFF
A["A compute()计算"]
B["B compute()计算"]
D["C compute()计算"]
A---B
A---D
A.->|"消息"|B
B.->|"消息"|A
A.->|"消息"|D
D.->|"消息"|A

这些消息将会在下一个超步(S+1)中被目标顶点接收,然后像上述过程一样开始下一个超步(S+1)的迭代过程。

在Pregel计算过程中,一个算法什么时候可以结束,是由所有顶点的状态决定的。

在第0个超步,所有顶点处于活跃状态。

当一个顶点不需要继续执行进一步的计算时,就会把自己的状态设置为“停机”,进入非活跃状态。

当一个处于非活跃状态的顶点收到来自其他顶点的消息时,Pregel计算框架必须根据条件判断来决定是否将其显式唤醒进入活跃状态。

当图中所有的顶点都已经标识其自身达到“非活跃(inactive)”状态,并且没有消息在传送的时候,算法就可以停止运行。

graph LR
title[状态变化]
style title fill:#FFF,stroke:#FFF
A(("活跃"))--"不需要执行进一步计算"-->B(("不活跃"))
B--"收到消息后"-->A

4.4.2 giraph 代码例子

最根本的是org.apache.giraph.graph.Computation接口,最重要的是实现里面的compute方法。

compute方法会在4.3.2 client 中的ComputeCallable的call来调用

compute的方法实现,例如求连通分量中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
public void compute(
Vertex<IntWritable, IntWritable, NullWritable> vertex,
Iterable<IntWritable> messages) throws IOException {
int currentComponent = vertex.getValue().get();

// First superstep is special, because we can simply look at the neighbors
if (getSuperstep() == 0) {
for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
int neighbor = edge.getTargetVertexId().get();
if (neighbor < currentComponent) {
currentComponent = neighbor;
}
}
// Only need to send value if it is not the own id
if (currentComponent != vertex.getValue().get()) {
vertex.setValue(new IntWritable(currentComponent));
for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
IntWritable neighbor = edge.getTargetVertexId();
if (neighbor.get() > currentComponent) {
sendMessage(neighbor, vertex.getValue());
}
}
}

vertex.voteToHalt();
return;
}

boolean changed = false;
// did we get a smaller id ?
for (IntWritable message : messages) {
int candidateComponent = message.get();
if (candidateComponent < currentComponent) {
currentComponent = candidateComponent;
changed = true;
}
}

// propagate new component id to the neighbors
if (changed) {
vertex.setValue(new IntWritable(currentComponent));
sendMessageToAllEdges(vertex, vertex.getValue());
}
vertex.voteToHalt();
}