目标:需要得到能根据各类菜品所提供的信息作为特征列,预测是否为甜品的模型
源数据所提供的信息包括:菜品名称、营养参数、制成材料、以及菜品的描述与备注:
数据清洗部分
读取数据帧
foods = spark.read.csv('../DataSet/recipes/epi_r.csv',header=True,inferSchema=True)
1. 数据大小与结构确认
# 12-2 确认数据的大小
print(foods.count(),len(foods.columns)) # 20057 680
foods.printSchema()
# rating和calories数据类型应为double,说明这两列夹带了字符串
# 数据帧中的命名存在空格和特殊字符
root
|-- title: string (nullable = true)
|-- rating: string (nullable = true)
|-- calories: string (nullable = true)
|-- protein: double (nullable = true)
|-- fat: double (nullable = true)
…… (省略)
|-- #cakeweek: double (nullable = true)
|-- #wasteless: double (nullable = true)
|-- 22-minute meals: double (nullable = true)
|-- 3-ingredient recipes: double (nullable = true)
|-- 30 days of groceries: double (nullable = true)
|-- bon app��tit: double (nullable = true)
|-- house & garden: double (nullable = true)
|-- crêpe: double (nullable = true)
|-- sauté: double (nullable = true)
|-- snack week: double (nullable = true)
|-- turkey: double (nullable = true)
2. 规范化列名
# 要求规范化为小写字母、数字以及下划线
# 列中特殊字符统一修改为下划线,其中[&]变更为[and]
def sanitize_column_name(name_org: str) -> str:
re_name = name_org
re_name = ' '.join(re_name.split()) # 除去多余空格
# 修改特殊字符
for i,j in ((' ','_'),('&','and'),('-','_'),('\\','_'),('/','_')):
re_name = re_name.replace(i,j)
# 返回类型限定
return (
"".join([
char
for char in re_name
if char.isalpha() or char.isdigit() or char=='_'
]).lower() # 不能使用 isnumeric 部分字符只能用 isdigit
)
# 覆盖原先的旧列
foods = foods.toDF(*[sanitize_column_name(col_name) for col_name in foods.columns])连续值与离散值的划分
3. 连续值与离散值的划分
先找出离散值中,为二分类的值
foods.cache()
for x in foods.columns:
foods.select(x).summary().show()
# 以下数据符合二分类值的特征
# 其中的均值可以了解0和1的占比情况,这里0的占比居多,如其中casserole_gratin的列
+-------+--------------------+
|summary| casserole_gratin|
+-------+--------------------+
| count| 20052|
| mean|0.009924197087572311|
| stddev| 0.09912717808713252|
| min| 0.0|
| 25%| 0.0|
| 50%| 0.0|
| 75%| 0.0|
| max| 1.0|
+-------+--------------------+
验证部分
# 一共680列这里设置了1000
pd.set_option("display.max_rows",1000)
# 转出
# collect_set为数组的元组集合,这里用size(类似python的len)计数与判断
foods_agg = foods.agg(*[
(F.size(F.collect_set(x)) == 2).alias(x) for x in foods.columns
])
isbinary=foods_agg.toPandas() # 转换为Pandas的DataFrame
print(isbinary.unstack()) # 行列转换为长表
# ---------结果---------
title 0 False
rating 0 False
……
snack_week 0 True
turkey 0 True
anise 0 True
anniversary 0 True
anthony_bourdain 0 True
aperitif 0 True
appetizer 0 True
……
dtype: bool
将列筛选到各自的列表
cols = foods_agg.head().asDict()
# 先识别出连续列
continues_cols = []
for k,v in cols.items():
if v==False:
continues_cols.append(k)
# 剩下的就是二元列
b_cols=set(foods.columns)-set(continues_cols)
print(len(b_cols))
print(len(continues_cols))
print(sorted(continues_cols))
# ---------结果---------
672
8
['cakeweek', 'calories', 'fat', 'protein', 'rating', 'sodium', 'title', 'wasteless']
最后得到二分类值672列,其余假定连续值的,有8列
对continues_cols列的筛选
# cakeweek和wasteless为菜品备注,实际也应为二分类值,这里做验证:
foods.agg(F.collect_set('cakeweek'),F.collect_set('wasteless')).show(1,False)
foods.where("cakeweek>1 or wasteless>1")\
.select('title','rating','cakeweek','wasteless').show(20,False)
# ---------结果---------
+-------------------------------+----------------------+
|collect_set(cakeweek) |collect_set(wasteless)|
+-------------------------------+----------------------+
|[0.0, 1.0, 1188.0, 24.0, 880.0]|[0.0, 1.0, 1439.0] |
+-------------------------------+----------------------+
+----------------------------------------------------------------+------------------------+-------------+--------+---------+
|title |rating |calories |cakeweek|wasteless|
+----------------------------------------------------------------+------------------------+-------------+--------+---------+
|"Beet Ravioli with Pine Nut ""Goat Cheese"" Rosemary-Cream Sauce| Aged Balsamic Vinegar "|4.375 |880.0 |0.0 |
|"Seafood ""Cataplana"" with Saffron | Vermouth | and Sorrel "|24.0 |1439.0 |
|"""Pot Roast"" of Seitan | Aunt Gloria-Style " |2.5 |1188.0 |0.0 |
+----------------------------------------------------------------+------------------------+-------------+--------+---------+
上面的打印带有异常值的3行可以发现,这里不仅cakeweek和wasteless数值出现异常,double类型的rating和calories列也出现的字符串是数据不对齐导致的
4. 异常值缺失值的处理及格式转换
异常值处理
# 这里筛选出有效值0和1,也保留null值为后续插值做铺垫
foods=foods.where(
(F.col('cakeweek').isin([0,1]) | (F.col('cakeweek').isNull()))
& ((F.col('wasteless').isin([0,1])) | (F.col('wasteless').isNull()))
)
# 验证:删除了3行,剩下20054行640列
print(foods.count(),len(foods.columns))
列值划分
# 重新划分三类列,并从原列剔除后重排
# id列
identify_column = ['title']
# 连续值列
continue_columns = ['rating','calories','protein','fat','sodium',]
# 参照结果,是二分类列但是不作为输入的特征值
target_columns = ['dessert']
# 二分类列
binary_columns = [col
for col in foods.columns
if (col not in identify_column)
& (col not in continue_columns)
& (col not in target_columns)
]
关键缺失值处理
foods=foods.dropna(how='all',subset=[col for col in foods.columns if col not in identify_column])
foods=foods.dropna(how='any',subset=target_columns)
print(foods.count(),len(foods.columns)) # 20049 680 删除了5行
部分缺失值填充
# summary的结果中二分类值的均值都不高于0.5,因此将na的二元数组统一填充为0
foods = foods.fillna(0.0,subset=binary_columns)
因为这里的二分类值数据分布偏左,因此可以筛选掉不显著的列以提高模型表现和训练效率
# 设定False或者True的计数在10个以下时为不显著列
val_sum=[F.sum(col_name).alias(col_name) for col_name in binary_columns]
val_cnt_dict=foods.agg(*val_sum).head().asDict() # 将数据帧的整体聚合结果转换成python标准字典的格式
rows_cnt=foods.count()
rare_col=[k for k,v in val_cnt_dict.items() if v<10 or v>rows_cnt-10] # 筛选出特征不显著的列
print(len(rare_col)) # 167个不显著的列
binary_columns=list(set(binary_columns)-set(rare_col))
连续值的清洗
# 由于rating和calories列夹带了部分字符串,这里用udf做筛选
@F.udf(T.BooleanType())
def is_a_number(val:str) -> bool:
# 识别NA值
if not val:
return True
try:
_=float(val)
except ValueError:
return False
return True
# 验证:
foods.where((~is_a_number('rating')) | (~is_a_number('calories'))).show()
# ---------结果---------
+---------+------------+
| rating| calories|
+---------+------------+
| Cucumber| and Lemon "|
+---------+------------+
数据格式批量转换
# 筛选出rating和calories的连续值与空值,最后使用cast转换
for col in ['rating','calories']:
foods = foods.where(is_a_number(col))
foods=foods.withColumn(col,foods[col].cast(T.DoubleType()))
5. 新增特征列
# 每克蛋白质提供4千卡,每克脂肪提供9千卡,与总的卡路里的比值算热量比率
# 相比只用蛋白质或脂肪的含净量,考虑了总体相对分子质量的影响,调整了权重
foods=(foods
.withColumn('protein_ratio',F.col('protein')*4 / F.col('calories'))
.withColumn('fat_ratio',F.col('fat')*9 / F.col('calories'))
)
# 防止新增特征列时除法中出现的NA值,这里做了0填充
foods=foods.fillna(0.0,subset=['protein_ratio','fat_ratio'])
# 更新原有的连续值的数组
continue_columns+=['protein_ratio','fat_ratio']
机器学习部分
1. null值的插值处理
OLD_COLUMNS=['calories','protein','fat','sodium']
NEW_COLUMNS=['calories_i','protein_i','fat_i','sodium_i']
imputer = Imputer(
strategy='median',
inputCols=OLD_COLUMNS,
outputCols=NEW_COLUMNS
)
2. 特征值极端值处理
这里自定义了一个新的评估器:
大于均值与标准差的2倍绝对值的定义为极端值,并作为极值替换极端值
class _ExtremeValueCapperParam(
HasInputCol,HasOutputCol,
HasInputCols,HasOutputCols,
DefaultParamsWritable,DefaultParamsReadable
):
boundary = Param(
Params._dummy(),
'boundary',
'Multiple of standard deviation for the cap and floor.Default=0.0',
TypeConverters.toFloat,
)
def __init__(self,*args):
super().__init__(*args) # 这里显示可能调用的实参为前两个,如果不加DefaultParamsWritable, DefaultParamsReadable警告的话
self._setDefault(boundary=0.0)
@keyword_only
def setParams(self,*,
inputCol=None,outputCol=None,
inputCols=None,outputCols=None,
boundary=0.0,cap_lt=None,floor_lt=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def setInputCol(self,input_col):
return self.setParams(inputCol=input_col)
def setOutputCol(self,output_col) -> str:
return self.setParams(inputCol=output_col)
def setInputCols(self,new_inputCols):
return self.setParams(inputCols=new_inputCols)
def setOutputCols(self,new_outputCols):
return self.setParams(outputCols=new_outputCols)
class ExtremeValueCapperModel(Model,_ExtremeValueCapperParam):
cap_lt = Param(
Params._dummy(),
'cap_lt',
'upper bound of the values `inputCols` can take.'
'Values will be capped to those Values.',
TypeConverters.toListFloat
)
floor_lt = Param(
Params._dummy(),
'floor_lt',
'lower bound of the values `inputCols` can take.'
'Values will be floored to those Values.',
TypeConverters.toListFloat
)
@keyword_only
def __init__(self,
inputCols=None,outputCols=None,
cap_lt=None,floor_lt=None):
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
def setCap_lt(self,your_cap):
return self.setParams(cap_lt=your_cap)
def setFloor_lt(self,your_floor):
return self.setParams(floor_lt=your_floor)
def getCap_lt(self):
return self.getOrDefault(self.cap_lt)
def getFloor_lt(self):
return self.getOrDefault(self.floor_lt)
def _transform(self,dataset):
if not self.isSet('inputCols'):
raise ValueError(
'No input columns set for the '
'ExtremeValueCapperModel'
)
input_columns = dataset[self.getInputCols()]
output_columns = self.getOutputCols() # 这里不是dataset[self.getOutputCol()] 而是str类型
cap_list = self.getOrDefault('cap_lt')
floor_list = self.getOrDefault('floor_lt')
for input_col,output_col,cap_value,floor_value in zip(input_columns,output_columns,cap_list,floor_list):
dataset=dataset.withColumn(
output_col,
F.when(input_col > cap_value,cap_value)
.when(input_col < floor_value,floor_value)
.otherwise(input_col)
)
return dataset
class ExtremeValueCapper(Estimator,_ExtremeValueCapperParam):
@keyword_only
def __init__(self,
inputCol=None,outputCol=None,
inputCols=None,outputCols=None,
boundary=0.0):
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setBoundary(self,your_boundary):
return self.setParams(inputCol=your_boundary)
def getBoundary(self):
return self.getOrDefault(self.boundary)
def _fit(self,dataset):
input_columns= ([self.getInputCol()] if self.isSet('inputCol') else self.getInputCols())
output_columns=([self.getOutputCol()] if self.isSet('outputCol') else self.getOutputCols())
boundary = self.getBoundary()
cap_list=[]
floor_list=[]
for col_in in input_columns:
avg,std = dataset.agg(F.avg(col_in),F.stddev(col_in)).head()
cap_value = avg + std * boundary
floor_value = avg - std * boundary
cap_list.append(cap_value)
floor_list.append(floor_value)
return (
ExtremeValueCapperModel(
inputCols=input_columns,
outputCols=output_columns,
cap_lt=cap_list,
floor_lt=floor_list
)
)
limited_vals=ExtremeValueCapper(inputCols=continue_columns,outputCols=continue_columns,boundary=2)
3 & 4. 特征值的向量化和归一化
连续值的向量化
continue_columns_i = list(set(continue_columns) - set(OLD_COLUMNS)) + NEW_COLUMNS
CONTINUE_NB = [x for x in continue_columns_i if 'ratio' not in x]
continuous_assebler = VectorAssembler(
inputCols=CONTINUE_NB,
outputCol='continuous'
)
连续值的归一化
continuous_scaler = MinMaxScaler(
min=0.0,
max=1.0,
inputCol='continuous',
outputCol='continuous_scaled'
)
对连续值和二分类值的向量化
preml_assembler=VectorAssembler(
inputCols=binary_columns
+['continuous_scaled']
+['protein_ratio','fat_ratio'],
outputCol='features'
)
5.线性回归部分
lr=LogisticRegression(
featuresCol='features',
labelCol='dessert',
predictionCol='prediction'
)
最后完成上述步骤的流程拼接
foods_pipeline=Pipeline()
foods_pipeline.setStages(
[
imputer,
limited_vals,
continuous_assebler,continuous_scaler,
preml_assembler,
lr
]
)
# 这里使用70%数据用作训练集,30%用作测试集,随机种子3
train,test = foods.randomSplit([0.7,0.3],3)
train.cache()
foods_model=foods_pipeline.fit(train)
results=foods_model.transform(test)
results.cache()
6. 模型分类效果初步验证
evaluator=BinaryClassificationEvaluator(
labelCol='dessert',
rawPredictionCol='rawPrediction',
metricName='areaUnderROC'
)
lr_model=foods_model.stages[-1]
metrics=lr_model.evaluate(results.select('dessert','features'))
print(f"Motel precision: {metrics.precisionByLabel[1]}")
print(f"Motel recall: {metrics.recallByLabel[1]}")
areaUnderROC=evaluator.evaluate(results)
print('一般模型下的AUC值')
print(f'Area under ROC = {areaUnderROC}')
plt.figure(figsize=(5,5))
plt.plot([0,1],[0,1],'r--')
plt.plot(
lr_model.summary.roc.select("FPR").collect(),
lr_model.summary.roc.select("TPR").collect()
)
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.show()
# -------------结果-------------
Motel precision: 0.9109900090826522
Motel recall: 0.9151459854014599
一般模型下的AUC值
Area under ROC = 0.9907149155115066
7. 模型优化与检验
这里使用K折交叉验证,优化超参数,因为耗时较长,重新配置了spark入口参数
# 集群资源:
# 3个节点
# 12核心
# 32G内存
spark=(
SparkSession
.builder
.master('yarn')
.appName('KfoldCrossValidModel')
.config('spark.executor.cores','3')
.config('spark.executor.instances','3')
.config('spark.driver.memory','8g')
.config('spark.executor.memory','6g')
.getOrCreate()
)
# 向参数网络添加回归直线中L1、L2正则的比例参数
grid_search=(
ParamGridBuilder()
.addGrid(lr.elasticNetParam,[0.0,0.25,0.5,0.75,1.0])
.build()
)
# BinaryClassificationEvaluator作为交叉验证的依据
# 数据分为3份,并设置保存训练过程中产生的模型
cv = CrossValidator(
estimator=foods_pipeline,
estimatorParamMaps=grid_search,
evaluator=evaluator,
numFolds=3,
seed=5,
collectSubModels=True
)
cv_model=cv.fit(foods)
print(cv_model.avgMetrics)
# -------------结果-------------
# 这里的结果代表交叉验证过程中,模型的准确率、加权精确度、加权召回率和加权 F 度量的平均值分
# 在L1、L2正则的比例在0.5的时候最好
[0.991426005564081, 0.9914260072284723, 0.9914287502554829, 0.9914247169694952, 0.991428430694426]
# 保存模型
cv_model.write().overwrite().save('hdfs://mainnode:8020/a1/my_model/dessert_Kfold')
模型的读取和验证
如果能正常读取截距和各项参数说明模型以训练完成
loaded_model=CrossValidatorModel.load('hdfs://mainnode:8020/a1/my_model/dessert_Kfold')
best_model = loaded_model.bestModel
lr_model = best_model.stages[-1]
results = best_model.transform(foods)
# 使用schema中保存的列名,因为向量化时,二维的数组会变成一维的,名称会变更
# 如continuous_scaled会将内部参数单独列为:continuous_scaled_0~continuous_scaled_n-1
feature_names=['lr_Intercept']+[
x['name']
for x in (
results
.schema['features']
.metadata['ml_attr']['attrs']['numeric']
)
]
feature_coefficients = [lr_model.intercept] + list(
lr_model.coefficients.values
)
coefficients = pd.DataFrame(
feature_coefficients,index=feature_names,columns=['coef']
)
print(coefficients)
# -------------结果-------------
coef
lr_Intercept -1.831518
squid -4.722940
wedding -1.725413
pasta -2.910748
triple_sec 0.203789
... ...
continuous_scaled_2 24.190459
continuous_scaled_3 -5.736020
continuous_scaled_4 -7.597928
protein_ratio -7.945574
fat_ratio 0.656431
准确率,召回率的计算
metrics = lr_model.evaluate(results.select('dessert','features'))
print(f"Motel precision: {metrics.precisionByLabel[1]}")
print(f"Motel recall: {metrics.recallByLabel[1]}")
from sklearn.metrics import roc_curve,auc
results.select('title','rawPrediction','probability','dessert').show(20,False)
predictions = results.select('rawPrediction','dessert').collect()
labels = [row.dessert for row in predictions]
scores = [row.rawPrediction[1] for row in predictions]
FPR,TPR,_ = roc_curve(labels,scores)
areaUnderROC = auc(FPR,TPR)
print('交叉验证模型下AUC值')
print(f'Area under ROC = {areaUnderROC}')
# -------------结果-------------
Motel precision: 0.9499578296317122
Motel recall: 0.9457038902882732
可视化部分
from sklearn.metrics import roc_curve,auc
# 因为这里的summary值在模型内部并没有保存,因此需要手动计算FPR TPR以及ROC
# 这里选取原始预测值rawPrediction作为输入的scores
# dessert列为结果labels
predictions = results.select('rawPrediction','dessert').collect()
labels = [row.dessert for row in predictions]
scores = [row.rawPrediction[1] for row in predictions]
FPR,TPR,_ = roc_curve(labels,scores)
areaUnderROC = auc(FPR,TPR)
print('交叉验证模型下AUC值')
print(f'Area under ROC = {areaUnderROC}')
plt.figure(figsize=(5,5))
plt.plot([0,1],[0,1],'r--')
plt.plot(FPR,TPR)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
# -------------结果-------------
交叉验证模型下AUC值
Area under ROC = 0.9972339842699864
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。