Spark GraphX

图性能优化笔记1

原始方法对求社区对应顶点数过程中,使用了单机函数collectAsMap,当社区数非常多时,该函数将数据汇总到driver节点,导致driver节点发生OOM。
解决办法:避免对大数据量使用collectAsMap操作,改为RDD集合求将交集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def getGraphWithCommVertexNum[ED: ClassTag](g: Graph[VertexAttr, ED]): Graph[VertexAttr, ED] = {
/**
* modified by cdemishangian
*
* BugFix: avoid using collectAsMap lead to driver OOM
*
* val cvnMap = g.vertices.map ( e => (e._2.community, 1)).reduceByKey(_ + _).collectAsMap
* val graph = g.mapVertices{ (vid, vertexAttr) =>
* vertexAttr.commVertexNum = cvnMap.getOrElse(vertexAttr.community, LocalConstants.countMissingVal)
* vertexAttr
* }
*/
val cvnMapRDD = g.vertices.map(e => (e._2.community, 1)).reduceByKey(_ + _)
val commCntRDD = g.vertices.map(r =>{
(r._2.community,r._1)
}).leftOuterJoin(cvnMapRDD).map(r=>
(r._2._1,(r._1,r._2._2.getOrElse(LocalConstants.countMissingVal)))
)
val graph = g.subgraph(vpred = (vid,attr)=> vid != LocalConstants.vidLongMissingVal).joinVertices(commCntRDD){
(vid, vertexAttr, commCnt) =>
vertexAttr.commVertexNum = commCnt._2
vertexAttr
}
graph
}

图性能优化笔记2

由于spark2.0中彻底废弃了mapReduceTriplets函数,改为了aggregateMessages操作,
以下代码为mapReduceTriplets转换为aggregateMessages的转换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val nodeWeightMapFunc = (e:EdgeTriplet[VD,Long]) => Iterator((e.srcId,e.attr), (e.dstId,e.attr))
val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2
val nodeWeights = graph.mapReduceTriplets(nodeWeightMapFunc,nodeWeightReduceFunc)
val nodeWeightMapFunc = (e:EdgeContext[VD,Long,Long]) => {
e.sendToDst(e.attr)
e.sendToSrc(e.attr)
}
val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2
val nodeWeights = graph.aggregateMessages(nodeWeightMapFunc,nodeWeightReduceFunc)
/**
* Creates the messages passed between each vertex to convey neighborhood community data.
*/
private def sendMsg(et:EdgeTriplet[VertexState,Long]) = {
val m1 = (et.dstId,Map((et.srcAttr.community,et.srcAttr.communitySigmaTot)->et.attr))
val m2 = (et.srcId,Map((et.dstAttr.community,et.dstAttr.communitySigmaTot)->et.attr))
Iterator(m1, m2)
}
private def sendMsg(et:EdgeContext[VertexState,Long,Map[(Long,Long),Long]]) = {
et.sendToDst(Map((et.srcAttr.community, et.srcAttr.communitySigmaTot) -> et.attr))
et.sendToSrc(Map((et.dstAttr.community, et.dstAttr.communitySigmaTot) -> et.attr))
}

图性能优化笔记3

现在业务场景在计算graphx中的三角计数指标时,由于关系数很多,达到10亿条边,通过观察运行日志发现,每次运行tcNum指标时,会产生25倍于边内存量的Shuffle Read/Write;由于缺乏对底层原理的掌握,一直不知道如何优化,经过几周的间断性尝试,今天终于通过设置图分区策略解决问题:设置PartitionStrategy.EdgePartition2D

1
2
3
4
5
6
7
8
9
10
11
12
def getGraphWithAttrTCNum[ED: ClassTag](g: Graph[VertexAttr, ED]): Graph[VertexAttr, ED] = {
val newGraph = g.convertToCanonicalEdges().partitionBy(PartitionStrategy.EdgePartition2D)
val tcv = newGraph.triangleCount().vertices
logWarning("JRDM:"+tcv.count)
val graph = g.outerJoinVertices(tcv) { (id, a, o) =>
a.tcNum = o.getOrElse(LocalConstants.countMissingVal)
a
}
graph
}

val lineArray = line.split(“\s+”)//\s表示空格,回车,换行等空白符, +号表示一个或多个

流程

  1. sc.textFile 读文件,生成原始的RDD
  2. 每个分区(的计算节点)把每条记录放进 PrimitiveVector 里,这个结构是spark里为primitive数据优化的存储结构。
    把 PrimitiveVector 里的数据一条条取出,转化成 EdgePartition ,即 EdgeRDD 的分区实现。这个过程中生成了面向列存的结构:src点的array,dst点的array,edge的属性array,以及两个正反向map(用于对应点的local id和global id)。
  3. 对 EdgeRDD 做一次count触发这次边建模任务,真正persist起来。
  4. 用 EdgePartition 去生成一个 RoutingTablePartition ,里面是vertexId到partitionId的对应关系,借助 RoutingTablePartition 生成 VertexRDD 。
  5. 由 EdgeRDD 和 VertexRDD 生成 Graph。前者维护了边的属性、边两头顶点的属性、两头顶点各自的global vertexID、两头顶点各自的local Id(在一个edge分区里的array index)、用于寻址array的正反向map。后者维护了点存在于哪个边的分区上的Map。

我们对 Fast Unfolding 算法做一个简要介绍,它分为以下两个阶段:

第一个阶段:首先将每个节点指定到唯一的一个社区,然后按顺序将节点在这些社区间进行移动。怎么移动呢?以上图中的节点 i 为例,它有三个邻居节点 j1, j2, j3,我们分别尝试将节点 i 移动到 j1, j2, j3 所在的社区,并计算相应的 modularity 变化值,哪个变化值最大就将节点 i 移动到相应的社区中去(当然,这里我们要求最大的 modularity 变化值要为正,如果变化值均为负,则节点 i 保持不动)。按照这个方法反复迭代,直到网络中任何节点的移动都不能再改善总的 modularity 值为止。

第二个阶段:将第一个阶段得到的社区视为新的“节点”(一个社区对应一个),重新构造子图,两个新“节点”之间边的权值为相应两个社区之间各边的权值的总和。

我们将上述两个阶段合起来称为一个 pass,显然,这个 pass 可以继续下去。

从上述描述我们可以看出,这种算法包含了一种 hierarchy 结构,正如对一个学校的所有初中生进行聚合一样,首先我们可以将他们按照班级来聚合,进一步还可以在此基础上按照年级来聚合,两次聚合都可以看做是一个社区发现结果,就看你想要聚合到什么层次与程度。

标签传播算法(LPA)的做法比较简单:
第一步: 为所有节点指定一个唯一的标签;
第二步: 逐轮刷新所有节点的标签,直到达到收敛要求为止。对于每一轮刷新,节点标签刷新的规则如下:
对于某一个节点,考察其所有邻居节点的标签,并进行统计,将出现个数最多的那个标签赋给当前节点。当个数最多的标签不唯一时,随机选一个。

注:算法中的记号 N_n^k 表示节点 n 的邻居中标签为 k 的所有节点构成的集合。

并行化问题及解决策略

进行并行化处理时,我们主要遇到两个问题:一是中间计算量过大,二是消息滞后。

中间计算量过大
如果直接使用公式(1)进行Modularity计算,会导致中间计算量过大,因为它需要考虑两两节点对的情况(pairwise),即n平方的量级(n为节点个数),在大数据量情况下并不可行。

尝试的一个解决方法是,进行分步计算,如根据节点Id的hash值将数据划分成100个分区,每次只对分区内的节点进行计算。但是这种方法处理不直观,效率也不高。

经过反复尝试后,我们发现,更好的解决方法是使用化简后的公式(2)进行处理,避免了pairwise的过程。

消息滞后
由于在并行化处理时,在t轮时每个节点根据t-1轮时的邻居社区信息进行更新,存在一定的消息滞后现象,会造成 “互换社区” 的问题

每个节点被分配到不同的社区中(节点1属于G1,节点2属于G2,节点3属于G3,节点4属于G4)
第二轮b图时,每个节点根据它邻居的信息进行更新(如节点1的新社区为邻居节点2在第一轮的社区G2)
最终情况会导致不相连的节点反而归属同一社区(如节点1与3均受到节点2的影响,归属社区G2)
第三轮c图类似,造成社区的互换。造成这种情况的原因在于,每个节点根据它的邻居前一轮的信息进行变化,而它的邻居也在同步改变。

类似的,还会存在有 “社区归属延迟” 问题。示意图如图4所示。节点1的归属社区受到节点2的影响,归属到社区2。但是节点2的社区也在同步变化,它可能归属于社区3,这样就造成只有节点1归属到社区2,成为一个孤立的点。

考虑有以下两种解决策略:

添加随机值,即每轮迭代中会有部分节点的社区保持不变。如果阈值足够高,其实相当于逐个节点进行社区信息的更新,也即与串行的方法等价。使用随机值带来的问题是不能保证结果,得到的Modularity值有时高,有时低。并且,“互换社区”的问题不一定能解决。考虑到的一种解决思路是,多次运行,取最优。但是,这种方法也不太可靠,随机性较大。

得到结果后构建逻辑图,求解连通区域,将同一个连通区域的点都归为一个社区。比如初始结果是互换社区的<1,2>,<2,1>(格式为<节点Id,归属社区>),求连通区域就可以将它们都归属同一社区。这种思路也可以解决 “社区归属延迟”的问题,如初始结果是<1,2>,<2,3>,<3,4>,节点1应该与归属社区2,但是节点2又归属于社区3,所以最终应该节点1,2,3都归属社区3。

对比上面两种方法,后一种策略充分考虑了图的特性,更为可取,能够保证结果的稳定性。大致代码如下:

总结

FastUnfolding算法,基于结果Modularity值的优化进行,得到的社区发现效果比较理想,对比LPA算法会更稳定。并且,FastUnfolding算法会不断合并节点构造新图,大大减少了计算量,使得大规模图数据的计算成为可能。

原始的FastUnfolding算法采用串行化的实现思路,不适合面对海量数据。实现中需要进行算法并行化,充分利用并行化框架带来的计算优势。在将传统的串行化算法改造成并行化算法的过程中时,会遇到中间计算量过大、消息滞后造成的问题,如“互换社区”和“社区归属延迟”问题。解决的思路是考虑图的特性,对结果再次求解连通图区域,并通过重置社区得到最终结果。这样既保证了算法的准确性,又保证其性能,从而能够在大规模的网络上,进行实际的生产应用。

Reference:

[1]: GraphX 图数据建模和存储