作者:淘宝技术部-刀
现实生活中存在各种各样的网络,诸如人际关系网、交易网、运输网等等。对这些网络进行社区发现具有极大的意义,如在人际关系网中,可以发现出具有不同兴趣、背景的社会团体,方便进行不同的宣传策略;在交易网中,不同的社区代表不同购买力的客户群体,方便运营为他们推荐合适的商品;在资金网络中,社区有可能是潜在的洗钱团伙、刷钻联盟,方便安全部门进行相应处理;在相似店铺网络中,社区发现可以检测出商帮、价格联盟等,对商家进行指导等等。总的来看,社区发现在各种具体的网络中都能有重点的应用场景,图1展示了基于图的拓扑结构进行社区发现的例子。
图1. 基于图的拓扑结构进行社区发现
社区发现作为网络科学的经典问题之一,长期受到研究者的广泛关注。
除了上述方法外,也有不少社区发现的算法是基于优化Modularity值的方向进行的。
Modularity值 用于评估社区发现的效果,对比社区发现结果与随机图(Null Model)的差异。对于同一个输入图进行不同的社区发现策略,取得Modularity值较高的策略性能较好。具体计算公式如下:
其中,Aij 表示节点i与节点j之间的边的权重;ki 表示所有连接到节点i的边的权重之和;ci 表示当前节点i归属的社区;而当u等于v时,函数δ(u,v)的值为1,否则为0。
对上述公式进行化简,如下:
其中,∑in 表示一个社区内部的连线数,∑tot 表示一个社区所有节点的度数之和。对比公式(1),它少了判断两个节点是否属于同一个社区的δ(u,v)函数,在后面的章节中我们可以看到,这种化简带来的计算量上的好处。
综合数据规模、运行时间等多方面的考虑,本文选择Blondel等人提出的 FastUnfolding算法 进行实现。算法的基本步骤如下:
1.初始化,将每个节点划分在不同的社区中。
2.逐一选择各个节点,根据公式(3)计算将它划分到它的邻居社区中得到的Modularity增益。如果最大增益大于0,则将它划分到对应的邻居社区;否则,保持归属于原社区。
3.重复步骤2,直到节点的社区不再发生变化。
4.构建新图。新图中的点代表上一阶段产生的不同社区,边的权重为两个社区中所有节点对的边权重之和。重复步骤2,直到获得最大的Modularity值。
可以将上述步骤分为两阶段(Pass)
图2.FastUnfolding算法示意图
原始的FastUnfolding算法采用的是串行化实现方式:逐个选择节点,重新计算它的社区,不断进行迭代。这种串行化的计算方式,对分布式计算框架非常不友好,因为在选择一个节点计算它的增益的时候,其它的节点是不能进行变化的。这样不能进行并行化计算,也不能充分利用分布式框架的高并发、集群计算优势。另外Spark对于这种细粒度的操作,也非常的不合适,它为了改变单个结点的值,也需重新生成一个包含所有数据的RDD,开销非常的大。
为了将算法搬到分布式框架和集群上运行,我们需要对算法进行并行化改造。并行化的算法实现,会在每轮迭代中同步更新多个节点的信息,即根据t-1轮中邻居节点的信息来更新t轮中节点的信息,从而充分发挥高并发性的优势。
对照FastUnfolding的算法思路,定义一个新的数据结构VertexData,结构如下:
< code > class VertexData ( ) extends Serializable {var degree : Int = 0 // 该节点度值var community : Long = 0 // 该节点所属社区var communityDegreeSum : Long = - 1 // 该社区的度数之和var neighDegree : Int = 0 // 目标节点的度值var neighCommunity : Long = - 1 // 目标节点所属社区var neighCommunityDegreeSum : Long = - 1 // 目标节点的社区总权重var edgeCount : Long = - 1 // 该节点与目标节点的连线条数……}< / code > |
---|
有了上述信息,就可以记录当前节点及它的邻居节点的信息。同时,为了提升性能,后续还可以使用kryo的序列化方法来替代Java的Serializable方法,获得时间和空间性能上的提升。
整体上,我们使用mrTriplets函数来实现算法,在map阶段,每个节点生成它所有邻居节点的VertexData消息,在reduce阶段将其合并,组成一个数组,包含这个节点的所有邻居信息。有了一个节点的所有邻居信息后,我们就可以使用公式(3)来计算它新归属的社区。上述操作对应于1st Pass的过程。
< code > // 初始化图,每个节点置于不同的社区var newGraph = generateInitGraph ( graph , degreeSum ) . cache ( )do {// 每个节点获得邻居节点的信息val vertexRdd = newGraph . mapReduceTriplets ( edgeMapFunc , _ ++ _ ) . cache( )// 根据上一轮中邻居的信息,更新节点的社区val idCommunity = vertexRdd . map {case ( vid , vdArray ) = > ( vid , getBestCommunity ( vdArray , curDegree ) )} . cache ( )// 根据新的节点社区,获得更新信息val updateMessage = getUpdateMessage ( idCommunity )// 更新图newGraph = newGraph . joinVertices ( updateMessage ) { . . . }. . .} while ( changeRate > minThreshold && i < maxIterations )< / code > |
---|
对于2nd Pass的操作,直接使用RDD处理起来更为直观。当前图的边信息保存在edgeRdd中,每行为节点对< srcId, dstId >。同时,我们将节点Id与它归属的社区信息保存在communityRdd中,每行为节点对< nodeId, communityId >。所以,执行两次的leftOuterJoin即可得到新图的边信息,具体代码如下:
< code > edgeRdd . leftOuterJoin ( communityRdd ). map { case ( srcId , ( dstId , srcComm ) ) = > ( dstId , srcComm . getOrElse ( 0L )) }. leftOuterJoin ( communityRdd ). map { case ( dstId , ( srcComm , dstComm ) ) = > ( srcComm , dstComm . getOrElse ( 0L ) ) }< / code > |
---|
有了新图的边信息后,使用Graph.fromEdgeTuples即可构建新图,完成2nd Pass的过程。
进行并行化处理时,我们主要遇到两个问题:一是中间计算量过大,二是消息滞后。
如果直接使用公式(1)进行Modularity计算,会导致中间计算量过大,因为它需要考虑两两节点对的情况(pairwise),即n平方的量级(n为节点个数),在大数据量情况下并不可行。
尝试的一个解决方法是,进行分步计算,如根据节点Id的hash值将数据划分成100个分区,每次只对分区内的节点进行计算。但是这种方法处理不直观,效率也不高。
经过反复尝试后,我们发现,更好的解决方法是使用化简后的公式(2)进行处理,避免了pairwise的过程。
由于在并行化处理时,在t轮时每个节点根据t-1轮时的邻居社区信息进行更新,存在一定的消息滞后现象,会造成 “互换社区” 的问题,示意图如下:
图3. “互换社区”问题示意图
变化情况如图3所示:
类似的,还会存在有 “社区归属延迟” 问题。示意图如图4所示。节点1的归属社区受到节点2的影响,归属到社区2。但是节点2的社区也在同步变化,它可能归属于社区3,这样就造成只有节点1归属到社区2,成为一个孤立的点。
图4.”社区归属延迟”问题示意图
考虑有以下两种解决策略:
对比上面两种方法,后一种策略充分考虑了图的特性,更为可取,能够保证结果的稳定性。大致代码如下:
< code > // 根据原始的<id, community>信息构建新图val rawG = Graph . fromEdgeTuples ( rawIdCommnity , 1 )// 获得连通区域val connetedComponent = rawG . connectedComponents ( ) . vertices// 得到最终结果val idCommunity = rawIdCommunity . join ( connetedComponent ) . map {case ( id , ( rawCommunity , newCommunity ) ) = > ( id , newCommunity )}< / code > |
---|
FastUnfolding算法,基于结果Modularity值的优化进行,得到的社区发现效果比较理想,对比LPA算法会更稳定。并且,FastUnfolding算法会不断合并节点构造新图,大大减少了计算量,使得大规模图数据的计算成为可能。
原始的FastUnfolding算法采用串行化的实现思路,不适合面对海量数据。实现中需要进行算法并行化,充分利用并行化框架带来的计算优势。在将传统的串行化算法改造成并行化算法的过程中时,会遇到中间计算量过大、消息滞后造成的问题,如“互换社区”和“社区归属延迟”问题。解决的思路是考虑图的特性,对结果再次求解连通图区域,并通过重置社区得到最终结果。这样既保证了算法的准确性,又保证其性能,从而能够在大规模的网络上,进行实际的生产应用。
根据我们的初步测评,在三千万的用户数据上,可以在2个小时的级别,发现四万的社区,基本满足生产预期。