一. 背景
二. 开发实现
1. 通过采集的数据内容,对应用户数据样例如下:
2. 对应方案内容
通常我们的数据内容,涵盖很多的唯一ID字段,如:用户id, 电子邮件,以及在移动APP中经常有使用的唯一设备信息内容(imei, imsi,mac, Device ID, etc...),通过这些唯一id的字段进行mapping设计,设别是否是同一个人并产生OneId。
3.1 初始化One Id
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object IdMapFirst {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
import spark.implicits._
val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
val data: RDD[Array[String]] = rawData.rdd.map(line => {
val jsonObj = JSON.parseObject(line)
// 从json对象中取user对象
// val userObj = jsonObj.getJSONObject("user")
val uid = jsonObj.getString("uid")
// 从user对象中取phone对象
val phoneObj = jsonObj.getJSONObject("phone")
val imei = phoneObj.getOrDefault("imei","").toString
val mac = phoneObj.getOrDefault("mac","").toString
val imsi = phoneObj.getOrDefault("imsi","").toString
val androidId = phoneObj.getOrDefault("androidId","").toString
val deviceId = phoneObj.getOrDefault("deviceId","").toString
val uuid = phoneObj.getOrDefault("uuid","").toString
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (id <- arr) yield (id.hashCode.toLong, id)
vertices.foreach(ele => println(ele._1 + " : " + ele._2))
val edges: RDD[Edge[String]] = data.flatMap(arr => {
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
.map(edge => (edge, 1)).reduceByKey(_ + _)
.filter(tp => tp._2 > 2)
.map(x => x._1)
//用 点集合 和 边集合 构造一张图 使用Graph算法
val graph = Graph(vertices,edges)
//并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
val firstIds = res.toDF("id","guid")
3.2 第二次数据和初始化数据合并
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object IdMapSecond {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
val data: RDD[Array[String]] = rawData.rdd.map(line => {
val jsonObj = JSON.parseObject(line)
// 从json对象中取user对象
// val userObj = jsonObj.getJSONObject("user")
val uid = jsonObj.getString("uid")
// 从user对象中取phone对象
val phoneObj = jsonObj.getJSONObject("phone")
val imei = phoneObj.getOrDefault("imei","").toString
val mac = phoneObj.getOrDefault("mac","").toString
val imsi = phoneObj.getOrDefault("imsi","").toString
val androidId = phoneObj.getOrDefault("androidId","").toString
val deviceId = phoneObj.getOrDefault("deviceId","").toString
val uuid = phoneObj.getOrDefault("uuid","").toString
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (id <- arr) yield (id.hashCode.toLong, id)
vertices.foreach(ele => println(ele._1 + " : " + ele._2))
val edges: RDD[Edge[String]] = data.flatMap(arr => {
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
.map(edge => (edge, 1)).reduceByKey(_ + _)
.filter(tp => tp._2 > 2)
.map(x => x._1)
val firstIdmap = spark.read.parquet("file:///E://code//study//spark_ml//file//userIds_demo2")
val firstVertices = firstIdmap.rdd.map(
case Row(id_hashcode: VertexId, guid: VertexId) =>
(id_hashcode, "")
val firstEdges = firstIdmap.rdd.map(row => {
val id_hashcode = row.getAs[VertexId]("id")
val guid = row.getAs[VertexId]("guid")
// 通过vertex, edge沟通graph
val graph = Graph(vertices.union(firstVertices),edges.union(firstEdges))
//result: VertexRDD[VertexId] => rdd(点id-long, 组中最小值)
val result: VertexRDD[VertexId] = graph.connectedComponents().vertices
val idMap = firstIdmap.rdd.map(row => {
val id_hashcode = row.getAs[VertexId]("id")
val guid = row.getAs[VertexId]("guid")
val bcMap = spark.sparkContext.broadcast(idMap)
import spark.implicits._
val todayIdmap = result.map(tup => (tup._2,tup._1))
.mapPartitions( iter => {
iter.map(tup => {
val idmapMap = bcMap.value
var todayGuid = tup._1
val ids = tup._2
var idFind = false
for (id <- ids if !idFind) {
val getGuid = idmapMap.get(id)
if (getGuid.isDefined) {
todayGuid = getGuid.get
idFind = true
(todayGuid, ids)
.flatMap(tup => {
val ids = tup._2
val guid = tup._1
for(ele <- ids) yield (ele, guid)
}).toDF("id", "guid")
val data2 = data.flatMap( arr => {
for(id <- arr) yield (id.hashCode.toLong, id)
}).toDF("id", "str_id")
val output = spark.sql("""select
| t1.id as id_hashcode,
| t2.str_id as id,
| t1.guid as guid
|from id_guid t1
|left join id_original t2 on t1.id = t2.id
|group by t1.id, t2.str_id, t1.guid
|order by guid""".stripMargin
|id_hashcode| id| guid|
| 110929767| u_001|-1908595409|
|-1908595409| uuid_zs_001|-1908595409|
|-1018465903| imsi_zs_002|-1908595409|
|-1753513447| deviceId_zs_001|-1908595409|
| -714652388| mac_zs_002|-1908595409|
|-1908595408| uuid_zs_002|-1908595408|
|-1884715312| imei_ls_001|-1884715312|
|-1884715311| imei_ls_002|-1884715311|
| 2140645735| deviceId_ls_001|-1782473361|
|-1419274017| imsi_ls_002|-1782473361|
| 110929768| u_002|-1782473361|
| 1985563773| uuid_ls_001|-1782473361|
|-1115460503| mac_ls_001|-1782473361|
|-1483907198| imei_zs_001|-1483907198|
|-1483907197| imei_zs_002|-1483907197|
|-1419274018| imsi_ls_001|-1419274018|
val output = spark.sql("""select
| t1.id as id_hashcode,
| t2.str_id as id,
| t1.guid as guid
|from id_guid t1
|left join id_original t2 on t1.id = t2.id
|group by t1.id, t2.str_id, t1.guid
|order by guid desc""".stripMargin
val info: RDD[(String, String, String)] = output.rdd.map(
row => {
val id = row.getAs[String]("id")
val oneId = row.getAs[Long]("guid").toString
val oneId_Md5 = DigestUtils.md5Hex(oneId)
(id, oneId,oneId_Md5)
