图性能优化笔记1
原始方法对求社区对应顶点数过程中,使用了单机函数collectAsMap,当社区数非常多时,该函数将数据汇总到driver节点,导致driver节点发生OOM。解决办法
:避免对大数据量使用collectAsMap操作,改为RDD集合求将交集。
|
|
图性能优化笔记2
由于spark2.0中彻底废弃了mapReduceTriplets函数,改为了aggregateMessages操作,
以下代码为mapReduceTriplets转换为aggregateMessages的转换操作
|
|
图性能优化笔记3
现在业务场景在计算graphx中的三角计数指标时,由于关系数很多,达到10亿条边,通过观察运行日志发现,每次运行tcNum指标时,会产生25倍于边内存量的Shuffle Read/Write;由于缺乏对底层原理的掌握,一直不知道如何优化,经过几周的间断性尝试,今天终于通过设置图分区策略解决问题:设置PartitionStrategy.EdgePartition2D
val lineArray = line.split(“\s+”)//\s表示空格,回车,换行等空白符, +号表示一个或多个
流程
- sc.textFile 读文件,生成原始的RDD
- 每个分区(的计算节点)把每条记录放进 PrimitiveVector 里,这个结构是spark里为primitive数据优化的存储结构。
把 PrimitiveVector 里的数据一条条取出,转化成 EdgePartition ,即 EdgeRDD 的分区实现。这个过程中生成了面向列存的结构:src点的array,dst点的array,edge的属性array,以及两个正反向map(用于对应点的local id和global id)。 - 对 EdgeRDD 做一次count触发这次边建模任务,真正persist起来。
- 用 EdgePartition 去生成一个 RoutingTablePartition ,里面是vertexId到partitionId的对应关系,借助 RoutingTablePartition 生成 VertexRDD 。
- 由 EdgeRDD 和 VertexRDD 生成 Graph。前者维护了边的属性、边两头顶点的属性、两头顶点各自的global vertexID、两头顶点各自的local Id(在一个edge分区里的array index)、用于寻址array的正反向map。后者维护了点存在于哪个边的分区上的Map。
我们对 Fast Unfolding 算法做一个简要介绍,它分为以下两个阶段:
第一个阶段:首先将每个节点指定到唯一的一个社区,然后按顺序将节点在这些社区间进行移动。怎么移动呢?以上图中的节点 i 为例,它有三个邻居节点 j1, j2, j3,我们分别尝试将节点 i 移动到 j1, j2, j3 所在的社区,并计算相应的 modularity 变化值,哪个变化值最大就将节点 i 移动到相应的社区中去(当然,这里我们要求最大的 modularity 变化值要为正,如果变化值均为负,则节点 i 保持不动)。按照这个方法反复迭代,直到网络中任何节点的移动都不能再改善总的 modularity 值为止。
第二个阶段:将第一个阶段得到的社区视为新的“节点”(一个社区对应一个),重新构造子图,两个新“节点”之间边的权值为相应两个社区之间各边的权值的总和。
我们将上述两个阶段合起来称为一个 pass,显然,这个 pass 可以继续下去。
从上述描述我们可以看出,这种算法包含了一种 hierarchy 结构,正如对一个学校的所有初中生进行聚合一样,首先我们可以将他们按照班级来聚合,进一步还可以在此基础上按照年级来聚合,两次聚合都可以看做是一个社区发现结果,就看你想要聚合到什么层次与程度。
标签传播算法(LPA)的做法比较简单:
第一步: 为所有节点指定一个唯一的标签;
第二步: 逐轮刷新所有节点的标签,直到达到收敛要求为止。对于每一轮刷新,节点标签刷新的规则如下:
对于某一个节点,考察其所有邻居节点的标签,并进行统计,将出现个数最多的那个标签赋给当前节点。当个数最多的标签不唯一时,随机选一个。
注:算法中的记号 N_n^k 表示节点 n 的邻居中标签为 k 的所有节点构成的集合。
并行化问题及解决策略
进行并行化处理时,我们主要遇到两个问题:一是中间计算量过大,二是消息滞后。
中间计算量过大
如果直接使用公式(1)进行Modularity计算,会导致中间计算量过大,因为它需要考虑两两节点对的情况(pairwise),即n平方的量级(n为节点个数),在大数据量情况下并不可行。
尝试的一个解决方法是,进行分步计算,如根据节点Id的hash值将数据划分成100个分区,每次只对分区内的节点进行计算。但是这种方法处理不直观,效率也不高。
经过反复尝试后,我们发现,更好的解决方法是使用化简后的公式(2)进行处理,避免了pairwise的过程。
消息滞后
由于在并行化处理时,在t轮时每个节点根据t-1轮时的邻居社区信息进行更新,存在一定的消息滞后现象,会造成 “互换社区” 的问题
每个节点被分配到不同的社区中(节点1属于G1,节点2属于G2,节点3属于G3,节点4属于G4)
第二轮b图时,每个节点根据它邻居的信息进行更新(如节点1的新社区为邻居节点2在第一轮的社区G2)
最终情况会导致不相连的节点反而归属同一社区(如节点1与3均受到节点2的影响,归属社区G2)
第三轮c图类似,造成社区的互换。造成这种情况的原因在于,每个节点根据它的邻居前一轮的信息进行变化,而它的邻居也在同步改变。
类似的,还会存在有 “社区归属延迟” 问题。示意图如图4所示。节点1的归属社区受到节点2的影响,归属到社区2。但是节点2的社区也在同步变化,它可能归属于社区3,这样就造成只有节点1归属到社区2,成为一个孤立的点。
考虑有以下两种解决策略:
添加随机值,即每轮迭代中会有部分节点的社区保持不变。如果阈值足够高,其实相当于逐个节点进行社区信息的更新,也即与串行的方法等价。使用随机值带来的问题是不能保证结果,得到的Modularity值有时高,有时低。并且,“互换社区”的问题不一定能解决。考虑到的一种解决思路是,多次运行,取最优。但是,这种方法也不太可靠,随机性较大。
得到结果后构建逻辑图,求解连通区域,将同一个连通区域的点都归为一个社区。比如初始结果是互换社区的<1,2>,<2,1>(格式为<节点Id,归属社区>),求连通区域就可以将它们都归属同一社区。这种思路也可以解决 “社区归属延迟”的问题,如初始结果是<1,2>,<2,3>,<3,4>,节点1应该与归属社区2,但是节点2又归属于社区3,所以最终应该节点1,2,3都归属社区3。3,4>2,3>1,2>2,1>1,2>
对比上面两种方法,后一种策略充分考虑了图的特性,更为可取,能够保证结果的稳定性。大致代码如下:
总结
FastUnfolding算法,基于结果Modularity值的优化进行,得到的社区发现效果比较理想,对比LPA算法会更稳定。并且,FastUnfolding算法会不断合并节点构造新图,大大减少了计算量,使得大规模图数据的计算成为可能。
原始的FastUnfolding算法采用串行化的实现思路,不适合面对海量数据。实现中需要进行算法并行化,充分利用并行化框架带来的计算优势。在将传统的串行化算法改造成并行化算法的过程中时,会遇到中间计算量过大、消息滞后造成的问题,如“互换社区”和“社区归属延迟”问题。解决的思路是考虑图的特性,对结果再次求解连通图区域,并通过重置社区得到最终结果。这样既保证了算法的准确性,又保证其性能,从而能够在大规模的网络上,进行实际的生产应用。
Reference:
[1]: GraphX 图数据建模和存储