项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法来提供混合推荐。提供了从前端应用、后台服务、算法设计实现、平台部署等多方位的闭环的业务实现。
用户可视化:主要负责实现和用户的交互以及业务数据的展示,主体采用 AngularJS2 进行实现,部署在 Apache 服务上。
综合业务服务:主要实现 JavaEE 层面整体的业务逻辑,通过 Spring 进行构建,对接业务需求。部署在 Tomcat 上。
【数据存储部分】
业务数据库:项目采用广泛应用的文档数据库 MongDB 作为主数据库,主要负责平台业务逻辑数据的存储。
缓存数据库:项目采用 Redis 作为缓存数据库,主要用来支撑实时推荐系统部分对于数据的高速获取需求。
【离线推荐部分】
离线统计服务:批处理统计性业务采用 Spark Core + Spark SQL 进行实现,实现对指标类数据的统计任务。
离线推荐服务:离线推荐业务采用 Spark Core + Spark MLlib 进行实现,采用 ALS 算法进行实现。
【实时推荐部分】
日志采集服务:通过利用 Flume-ng 对业务平台中用户对于商品的一次评分行为进行采集,实时发送到 Kafka 集群。
消息缓冲服务:项目采用 Kafka 作为流式数据的缓存组件,接受来自 Flume 的数据采集请求。并将数据推送到项目的实时推荐系统部分。
实时推荐服务:项目采用 Spark Streaming 作为实时推荐系统,通过接收 Kafka 中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到 MongoDB 数据库。
【系统初始化部分】
0、通过 Spark SQL 将系统初始化数据加载到 MongoDB 中。
【离线推荐部分】
1、离线统计服务从 MongoDB 中加载数据,将【商品平均评分统计】、【商品评分个数统计】、【最近商品评分个数统计】三个统计算法进行运行实现,并将计算结果回写到 MongoDB 中;离线推荐服务从 MongoDB 中加载数据,通过 ALS 算法分别将【用户推荐结果矩阵】、【影片相似度矩阵】回写到 MongoDB 中。
【实时推荐部分】
2、Flume 从综合业务服务的运行日志中读取日志更新,并将更新的日志实时推送到 Kafka 中;Kafka 在收到这些日志之后,通过 kafkaStream 程序对获取的日志信息进行过滤处理,获取用户评分数据流【UID|PID|SCORE|TIMESTAMP】,并发送到另外一个 Kafka 队列;Spark Streaming 监听 Kafka 队列,实时获取 Kafka 过滤出来的用户评分数据流,融合存储在 Redis 中的用户最近评分队列数据,提交给实时推荐算法,完成对用户新的推荐结果计算;计算完成之后,将新的推荐结构和 MongDB 数据库中的推荐结果进行合并。
【业务系统部分】
3、推荐结果展示部分,从 MongoDB 中将离线推荐结果、实时推荐结果、内容推荐结果进行混合,综合给出相对应的数据。
4、商品信息查询服务通过对接 MongoDB 实现对商品信息的查询操作。
5、商品评分部分,获取用户通过 UI 给出的评分动作,后台服务进行数据库记录后,一方面将数据推动到 Redis 群中,另一方面,通过预设的日志框架输出到 Tomcat 中的日志中。
6、商品标签部分,项目提供用户对商品打标签服务。
各数据表解析
数据源解析 以及 主要数据模型
我们的项目中用到了多种工具进行数据的存储、计算、采集和传输,本章主要简单介绍设计的工具环境搭建。
如果机器的配置不足,推荐只采用一台虚拟机进行配置,而非完全分布式,将该虚拟机CPU的内存设置的尽可能大,推荐为CPU > 4、MEM > 4GB。
参考链接:https://cloud.tencent.com/developer/article/1431268
我们的项目主体用 Scala 编写,采用 IDEA 作为开发环境进行项目编写,采用 maven 作为项目构建和管理工具。
打开 IDEA,创建一个 maven 项目,命名为 ECommerceRecommendSystem。为了方便后期的联调,我们会把业务系统的代码也添加进来,所以我们可以以 ECommerceRecommendSystem 作为父项目,并在其下建一个名为 recommender 的子项目,然后再在下面搭建多个子项目用于提供不同的推荐服务。
在 ECommerceRecommendSystem 下新建一个 maven module 作为子项目,命名为 recommender。同样的,再以 recommender 为父项目,新建一个 maven module 作为子项目。我们的第一步是初始化业务数据,所以子项目命名为 DataLoader。
父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以 ECommerceRecommendSystem 和 recommender 下的 src 文件夹都可以删掉。
目前的整体项目框架如下:
我们整个项目需要用到多个工具,它们的不同版本可能会对程序运行造成影响,所以应该在最外层的 ECommerceRecommendSystem 中声明所有子项目共用的版本信息。
在pom.xml中加入以下配置:
ECommerceRecommendSystem/pom.xml
<properties>
<log4j.version>1.2.17</log4j.version><!-- 日志的具体实现 -->
<slf4j.version>1.7.22</slf4j.version><!-- 日志接口 -->
<mongodb-spark.version>2.0.0</mongodb-spark.version><!-- mongodb 与 spark 之间的连接器 -->
<casbah.version>3.1.1</casbah.version><!-- mongodb 与 scala 之间的 Driver -->
<redis.version>2.9.0</redis.version>
<kafka.version>0.10.2.1</kafka.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<jblas.version>1.2.1</jblas.version><!-- java 线性代数的库 -->
</properties>
首先,对于整个项目而言,应该有同样的日志管理,我们在 ECommerceRecommendSystem 中引入公有依赖:
ECommerceRecommendSystem/pom.xml
<dependencies>
<!-- 引入共同的日志管理工具 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
同样,对于 maven 项目的构建,可以引入公有的插件:
<build>
<!-- 声明并引入子项目共有的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<!-- 所有的编译用 JDK1.8 -->
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<!-- 声明但不引入子项目共有的插件,子项目如果依赖需要自行引入 -->
<plugins>
<!-- maven 的打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 该插件用于将 scala 代码编译成 class 文件-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<!-- 绑定到 maven 的编译阶段-->
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
然后,在 recommender 模块中,我们可以为所有的推荐模块声明 spark 相关依赖(这里的 dependencyManagement 表示仅声明相关信息,子项目如果依赖需要自行引入):
ECommerceRecommendSystem/recommender/pom.xml
<dependencyManagement>
<!-- 声明但不引入子项目共有的插件,子项目如果依赖需要自行引入 -->
<dependencies>
<!-- 引入 Spark 相关的 Jar 包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
由于各推荐模块都是 scala 代码,还应该引入 scala-maven-plugin 插件,用于 scala 程序的编译。因为插件已经在父项目中声明,所以这里不需要再声明版本和具体配置:
<build>
<plugins>
<!-- 父项目已声明该 plugin,子项目在引入的时候,不用声明版本和已经声明的配置 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
对于具体的 DataLoader 子项目,需要 spark 相关组件,还需要 mongodb 的相关依赖,我们在 pom.xml 文件中引入所有依赖(在父项目中已声明的不需要再加详细信息):
ECommerceRecommendSystem/recommender/DataLoader/pom.xml
<!-- 对于具体的子项目而言,需要 spark 相关组件,还需要 mongodb 的相关依赖,我们引入所有依赖(在父项目中已声明的不需要再加详细信息) -->
<dependencies>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驱动 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
至此,我们做数据加载需要的依赖都已配置好,可以开始写代码了。
在 src/main/ 目录下,可以看到已有的默认源文件目录是 java,我们可以将其改名为 scala。将数据文件 products.csv,ratings.csv 复制到资源文件目录 src/main/resources 下,我们将从这里读取数据并加载到 mongodb 中。
数据格式:
productId,name,categoryIds,amazonId,imageUrl,categories,tags
例如:
3982^Fuhlen 富勒 M8眩光舞者时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离)^1057,439,736^B009EJN4T2^https://images-cn-4.ssl-images-amazon.com/images/I/31QPvUDNavL._SY300_QL70_.jpg^外设产品|鼠标|电脑/办公^富勒|鼠标|电子产品|好用|外观漂亮
Products 数据集有 7 个字段,每个字段之间通过 “^” 符号进行分割。其中的 categoryIds、amazonId 对于内容特征没有实质帮助,我们只需要其它5个字段:
数据格式:
userId,prudcutId,rating,timestamp
例如:
4867,457976,5.0,1395676800
Rating 数据集有 4 个字段,每个字段之间通过 “,” 分割。
log4j 对日志的管理,需要通过配置文件来生效。在 src/main/resources 下新建配置文件 log4j.properties,写入以下内容:
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
我们会为原始数据定义几个样例类,通过 SparkContext 的 textFile 方法从文件中读取数据,并转换成 DataFrame,再利用 Spark SQL 提供的 write 方法进行数据的分布式插入。
在 DataLoader/src/main/scala 下新建 package,命名为 com.atguigu.recommender,新建名为 DataLoader 的 scala 单例 object 对象文件。
程序主体代码如下:
DataLoader/src/main/scala/com.atguigu.recommerder/DataLoader.scala
// 定义样例类
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)
// 注意:spark mllib 中有 Rating 类,为了便于区别,我们重新命名为 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
object DataLoader {
// 定义数据文件路径
val PRODUCT_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea3\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\products.csv"
val RATING_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea3\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
// 定义 MongoDB 中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val MONGODB_RATING_COLLECTION = "Rating"
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 创建一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
// 创建一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 创建一个 sparkContext
val sc = spark.sparkContext
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包进行支持
import spark.implicits._
// 将 Products、Ratings 数据集加载进来
val productRDD = sc.textFile(PRODUCT_DATA_PATH)
// 将 prodcutRDD 装换为 DataFrame
val productDF = productRDD.map(item => {
// productId,name,categoryIds,amazonId,imageUrl,categories,tags
val attr = item.split("\\^")
// 取出数据,转换成样例类
Product(attr(0).trim.toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim)
}).toDF()
val ratingRDD = sc.textFile(RATING_DATA_PATH)
//将 ratingRDD 转换为 DataFrame
val ratingDF = ratingRDD.map(item => {
// userId,prudcutId,rating,timestamp
val attr = item.split(",")
// 取出数据,转换成样例类
ProductRating(attr(0).trim.toInt, attr(1).trim.toInt, attr(2).trim.toDouble, attr(3).trim.toInt)
}).toDF()
// 声明一个隐式的配置对象,方便重复调用(当多次调用对 MongoDB 的存储或读写操作时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 将数据保存到 MongoDB 中
storeDataInMongDB(productDF, ratingDF)
// 关闭 Spark
spark.stop()
}
接下来,实现 storeDataInMongo 方法,将数据写入 mongodb 中:
/**
* 将数据写入 MongoDB 中
*
* @param productDF
* @param ratingDF
* @param mongoConfig
*/
def storeDataInMongDB(productDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig) = {
// 创建一个到 MongoDB 的连接客户端
val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
// 定义通过 MongoDB 客户端拿到的表操作对象
val productCollection = mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION)
val ratingCollection = mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
// 如果 MongoDB 中已有对应的表,那么应该删除
productCollection.dropCollection()
ratingCollection.dropCollection()
// 将当前数据写入到 MongoDB 对应的表中
productDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 对数据表创建索引
productCollection.createIndex(MongoDBObject("productId" -> 1))
ratingCollection.createIndex(MongoDBObject("userId" -> 1))
ratingCollection.createIndex(MongoDBObject("productId" -> 1))
// 关闭 MongoDB 的连接
mongoClient.close()
}