前言:ML是Spark里面的机器学习库。它使得基础的一些机器学习算法变得简单易用。Spark提供了常规的机器学习算法,比如说分类、回归、聚类、协同过滤算法。还包含了类似于sk-learn中的一些特征提取以及特征选择。
现在我们基于Spark2.21这个版本,主要是介绍基于DataFrame的机器学习该如何实现,基本参照官网上的例子。来实现一遍,以便学习。在介绍一下相关的算法。
首先我们先介绍一些什么是线性回归,是根据自变量x1,x2...xn以及应变量y的实际样本,去估计两者之间近似的函数关系f,类似于我们有三个点(1,1) (2,2) (3,3)代表的是xy,我们需要找到一个函数来满足这三个点的一个f,很明显我们可以找到是y=x。当自变量x只有一个时,我们称之为一元线性回归,多个就是多元线性回归。
回归是一种监督学习,监督学习分两种,一种是回归一种就是分类,回归主要是用于预测连续值的比如说根据特征预测房价呀,分类就是类别判断,比如说垃圾邮件识别。
线性回归的表示方式是:
f(x) = w1X1 + w2X2 + ...+wnXn+b
其实就是我们初中学习的一元方程式。w1是斜率,b是截距。
我们需要做的就是找出最优的w值和b值,使得我们的预测值误差最小。
举个简单的列子:
房价预测,我们收集了一些房价的信息,包括房子的大小(hd)、房子的地理位置(hs)、房子的房间个数(hr)、房子的楼层(hl)等等。然后我们收集了它对应的房价。
我们就需要找到一个函数,通过这些已知的数据。
f(x) = w1*hd + w2*hs + w3*hr + w4*hl +..+ b
通过已知数据慢慢拟合找到最合适的w组以及b。
然后就是优化,一般的优化我们经常才有梯度下降的方式去优化。最小二乘法。
大概就是这个样子。然后我们就是用Spark来简单的演练一下吧。
Spark的实现
我们先看看源码:这是Spark的一个优化W和b的过程
/**
* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
* of the objective function.
*
*@param instanceThe instance of data point to be added.
*@returnThis LeastSquaresAggregator object.
*/
defadd(instance: Instance):this.type= {
instancematch{caseInstance(label,weight,features) =>
if(weight ==0.0)return this
valdiff =dot(features,effectiveCoefficientsVector) - label / labelStd +offset
if(diff !=) {
vallocalGradientSumArray =gradientSumArray
vallocalFeaturesStd =featuresStd
features.foreachActive { (index,value) =>
if(localFeaturesStd(index) !=0.0&& value !=0.0) {
localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index)
}
}
lossSum+= weight * diff * diff /2.0
}
totalCnt+=1
weightSum+= weight
this
}
}
然后还有个预测:
override protected defpredict(features: Vector):Double= {
dot(features,coefficients) + intercept
}
就是w*x + b
保存:
override protected defsaveImpl(path:String):Unit= {
// Save metadata and Params
DefaultParamsWriter.saveMetadata(instance,path,sc)
// Save model data: intercept, coefficients
valdata =Data(instance.intercept,instance.coefficients)
valdataPath =newPath(path,"data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
}
保存的就是w和b两个变量,所以说我们在外面看着很陌生,一旦深入进去,就觉得很简单了,这就是在求一个变量的过程。训练结果也就是我们所谓的模型其实就是这些个变量。
然后我们用Spark的例子来实现一下吧。
packageorg.apache.spark.examples
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.ml.regression.LinearRegression
importorg.apache.spark.ml.evaluation.RegressionEvaluator
objectSparkDemo {
defmain(args: Array[String]) {
valspark = SparkSession
.builder
.appName("LinearRegression").master("local")
.getOrCreate()
// Load training data
valtraining = spark.read.format("libsvm").load("file:\\D:\\sample_linear_regression_data.txt")
valArray(trainData,testData) = training.randomSplit(Array(0.8,0.2))
//模型设置参数
vallr =newLinearRegression().setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
//建立模型
vallrModel = lr.fit(trainData)
//打印学习到的参数
println(s"Coefficients:$Intercept:$")
//打印一些模型的参数
valtrainingSummary = lrModel.summary
println(s"numIterations:$")
println(s"objectiveHistory: [$]")
trainingSummary.residuals.show()
println(s"RMSE:$")
println(s"r2:$")
//预测数据
valpredict = lrModel.transform(testData)
predict.show(3)
//显示测试数据的指标
println("test rmse:"+newRegressionEvaluator().setMetricName("rmse").evaluate(predict))
println("test mse:"+newRegressionEvaluator().setMetricName("mse").evaluate(predict))
println("test r2:"+newRegressionEvaluator().setMetricName("r2").evaluate(predict))
println("test mae:"+newRegressionEvaluator().setMetricName("mae").evaluate(predict))
spark.stop()
}
}
// scalastyle:on println
我们来看看里面对应的输出:
差不多这样就在Spark上实现了一个线性回归。但是呢其中还是有很多参数是需要我们进一步去调试跟进的。
下面我在介绍一下该算法在Spark中的一些参数吧。
setMaxIter()设置迭代次数。默认是100.
@Since("1.3.0")
defsetMaxIter(value:Int):this.type= set(maxIter,value)
setDefault(maxIter->100)
setTol() 设置容错,当我们随着迭代次数的增加,误差值会越来越小,当误差值下于该设置的值时,就会停止迭代。默认1E-6
@Since("1.4.0")
defsetTol(value: Double):this.type= set(tol,value)
setDefault(tol ->1E-6)
setRegParam() 设置正则化项系数,正则化项主要用于防止过拟合现象,当数据集比较小,特征维数又比较多的时候,易出现过拟合,此时可以考虑增大正则化项系数 默认0.0
@Since("1.3.0")
defsetRegParam(value:Double):this.type= set(regParam,value)
setDefault(regParam->0.0)
setElasticNetParam() 正则化范式比(默认0.0),正则化一般有两种范式:L1和L2正则。L1一般用于特征的稀疏化,L2一般用于防止过拟合。这里的参数即设置L1范式的占比,默认0.0即只使用L2范式
@Since("1.4.0")
defsetElasticNetParam(value:Double):this.type= set(elasticNetParam,value)
setDefault(elasticNetParam->0.0)
setFeaturesCol() 设置特征列 默认是“features”
setLabelCol() 设置标签列 默认是“label”
setWeightCol() 设置权重,这里是设个各个参数的权重比,默认都为1。
@Since("1.6.0")
defsetWeightCol(value:String):this.type= set(weightCol,value)
领取专属 10元无门槛券
私享最新 技术干货