前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark项目:甜品分类判断

pyspark项目:甜品分类判断

原创
作者头像
用户11196524
修改2024-10-15 12:06:18
1100
修改2024-10-15 12:06:18

目标:需要得到能根据各类菜品所提供的信息作为特征列,预测是否为甜品的模型

源数据所提供的信息包括:菜品名称、营养参数、制成材料、以及菜品的描述与备注:

数据清洗部分

  1. 数据大小与结构确认
  2. 规范化列名
  3. 连续值与离散值的划分
  4. 异常值缺失值的处理及格式转换
  5. 新增特征列

读取数据帧

代码语言:javascript
复制
foods = spark.read.csv('../DataSet/recipes/epi_r.csv',header=True,inferSchema=True)

1. 数据大小与结构确认

代码语言:javascript
复制
# 12-2 确认数据的大小
print(foods.count(),len(foods.columns)) # 20057 680
foods.printSchema()

# rating和calories数据类型应为double,说明这两列夹带了字符串
# 数据帧中的命名存在空格和特殊字符
 root
 |-- title: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- calories: string (nullable = true)
 |-- protein: double (nullable = true)
 |-- fat: double (nullable = true)
 …… (省略)
 |-- #cakeweek: double (nullable = true)
 |-- #wasteless: double (nullable = true)
 |-- 22-minute meals: double (nullable = true)
 |-- 3-ingredient recipes: double (nullable = true)
 |-- 30 days of groceries: double (nullable = true)
 |-- bon app��tit: double (nullable = true)
 |-- house & garden: double (nullable = true)
 |-- crêpe: double (nullable = true)
 |-- sauté: double (nullable = true)
 |-- snack week: double (nullable = true)
 |-- turkey: double (nullable = true)

2. 规范化列名

代码语言:javascript
复制
# 要求规范化为小写字母、数字以及下划线
# 列中特殊字符统一修改为下划线,其中[&]变更为[and]
def sanitize_column_name(name_org: str) -> str:
    re_name = name_org
    re_name = ' '.join(re_name.split())    # 除去多余空格
    # 修改特殊字符
    for i,j in ((' ','_'),('&','and'),('-','_'),('\\','_'),('/','_')):
        re_name = re_name.replace(i,j)
    # 返回类型限定
    return (
        "".join([
            char
            for char in re_name
            if char.isalpha() or char.isdigit() or char=='_'
        ]).lower()  # 不能使用 isnumeric 部分字符只能用 isdigit
    )

# 覆盖原先的旧列
foods = foods.toDF(*[sanitize_column_name(col_name) for col_name in foods.columns])连续值与离散值的划分

3. 连续值与离散值的划分

先找出离散值中,为二分类的值

代码语言:javascript
复制
foods.cache()
for x in foods.columns:
    foods.select(x).summary().show()

# 以下数据符合二分类值的特征
# 其中的均值可以了解0和1的占比情况,这里0的占比居多,如其中casserole_gratin的列
+-------+--------------------+
|summary|    casserole_gratin|
+-------+--------------------+
|  count|               20052|
|   mean|0.009924197087572311|
| stddev| 0.09912717808713252|
|    min|                 0.0|
|    25%|                 0.0|
|    50%|                 0.0|
|    75%|                 0.0|
|    max|                 1.0|
+-------+--------------------+

验证部分

代码语言:javascript
复制
# 一共680列这里设置了1000
pd.set_option("display.max_rows",1000)

# 转出
# collect_set为数组的元组集合,这里用size(类似python的len)计数与判断
foods_agg = foods.agg(*[
    (F.size(F.collect_set(x)) == 2).alias(x) for x in foods.columns
])
isbinary=foods_agg.toPandas() # 转换为Pandas的DataFrame
print(isbinary.unstack())   # 行列转换为长表

# ---------结果---------
title                     0    False
rating                    0    False
……
snack_week                0     True
turkey                    0     True
anise                     0     True
anniversary               0     True
anthony_bourdain          0     True
aperitif                  0     True
appetizer                 0     True
……
dtype: bool

将列筛选到各自的列表

代码语言:javascript
复制
cols = foods_agg.head().asDict()
# 先识别出连续列
continues_cols = []
for k,v in cols.items():
    if v==False:
        continues_cols.append(k)

# 剩下的就是二元列
b_cols=set(foods.columns)-set(continues_cols)
print(len(b_cols))
print(len(continues_cols))
print(sorted(continues_cols))
# ---------结果---------
672
8
['cakeweek', 'calories', 'fat', 'protein', 'rating', 'sodium', 'title', 'wasteless']

最后得到二分类值672列,其余假定连续值的,有8列

对continues_cols列的筛选

代码语言:javascript
复制
# cakeweek和wasteless为菜品备注,实际也应为二分类值,这里做验证:
foods.agg(F.collect_set('cakeweek'),F.collect_set('wasteless')).show(1,False)

foods.where("cakeweek>1 or wasteless>1")\
    .select('title','rating','cakeweek','wasteless').show(20,False)

# ---------结果---------
+-------------------------------+----------------------+
|collect_set(cakeweek)          |collect_set(wasteless)|
+-------------------------------+----------------------+
|[0.0, 1.0, 1188.0, 24.0, 880.0]|[0.0, 1.0, 1439.0]    |
+-------------------------------+----------------------+

+----------------------------------------------------------------+------------------------+-------------+--------+---------+
|title                                                           |rating                  |calories     |cakeweek|wasteless|
+----------------------------------------------------------------+------------------------+-------------+--------+---------+
|"Beet Ravioli with Pine Nut ""Goat Cheese"" Rosemary-Cream Sauce| Aged Balsamic Vinegar "|4.375        |880.0   |0.0      |
|"Seafood ""Cataplana"" with Saffron                             | Vermouth               | and Sorrel "|24.0    |1439.0   |
|"""Pot Roast"" of Seitan                                        | Aunt Gloria-Style "    |2.5          |1188.0  |0.0      |
+----------------------------------------------------------------+------------------------+-------------+--------+---------+

上面的打印带有异常值的3行可以发现,这里不仅cakeweek和wasteless数值出现异常,double类型的rating和calories列也出现的字符串是数据不对齐导致的

4. 异常值缺失值的处理及格式转换

异常值处理

代码语言:javascript
复制
# 这里筛选出有效值0和1,也保留null值为后续插值做铺垫
foods=foods.where(
    (F.col('cakeweek').isin([0,1]) | (F.col('cakeweek').isNull()))
    & ((F.col('wasteless').isin([0,1])) | (F.col('wasteless').isNull()))
)
# 验证:删除了3行,剩下20054行640列
print(foods.count(),len(foods.columns))

列值划分

代码语言:javascript
复制
# 重新划分三类列,并从原列剔除后重排
# id列
identify_column = ['title']
# 连续值列
continue_columns = ['rating','calories','protein','fat','sodium',]
# 参照结果,是二分类列但是不作为输入的特征值
target_columns = ['dessert']
# 二分类列
binary_columns = [col
                  for col in foods.columns
                  if (col not in identify_column)
                  & (col not in continue_columns)
                  & (col not in target_columns)
                  ]

关键缺失值处理

代码语言:javascript
复制
foods=foods.dropna(how='all',subset=[col for col in foods.columns if col not in identify_column])
foods=foods.dropna(how='any',subset=target_columns)
print(foods.count(),len(foods.columns))	# 20049 680 删除了5行

部分缺失值填充

代码语言:javascript
复制
# summary的结果中二分类值的均值都不高于0.5,因此将na的二元数组统一填充为0
foods = foods.fillna(0.0,subset=binary_columns)

因为这里的二分类值数据分布偏左,因此可以筛选掉不显著的列以提高模型表现和训练效率

代码语言:javascript
复制
# 设定False或者True的计数在10个以下时为不显著列
val_sum=[F.sum(col_name).alias(col_name) for col_name in binary_columns]
val_cnt_dict=foods.agg(*val_sum).head().asDict()    # 将数据帧的整体聚合结果转换成python标准字典的格式

rows_cnt=foods.count()
rare_col=[k for k,v in val_cnt_dict.items() if v<10 or v>rows_cnt-10]   # 筛选出特征不显著的列
print(len(rare_col))    # 167个不显著的列
binary_columns=list(set(binary_columns)-set(rare_col))

连续值的清洗

代码语言:javascript
复制
# 由于rating和calories列夹带了部分字符串,这里用udf做筛选
@F.udf(T.BooleanType())
def is_a_number(val:str) -> bool:
    # 识别NA值
    if not val:
        return True
    try:
        _=float(val)
    except ValueError:
        return False
    return True

# 验证:
foods.where((~is_a_number('rating')) | (~is_a_number('calories'))).show()

# ---------结果---------
+---------+------------+
|   rating|    calories|
+---------+------------+
| Cucumber| and Lemon "|
+---------+------------+

数据格式批量转换

代码语言:javascript
复制
# 筛选出rating和calories的连续值与空值,最后使用cast转换
for col in ['rating','calories']:
    foods = foods.where(is_a_number(col))
    foods=foods.withColumn(col,foods[col].cast(T.DoubleType()))

5. 新增特征列

代码语言:javascript
复制
# 每克蛋白质提供4千卡,每克脂肪提供9千卡,与总的卡路里的比值算热量比率
# 相比只用蛋白质或脂肪的含净量,考虑了总体相对分子质量的影响,调整了权重
foods=(foods
       .withColumn('protein_ratio',F.col('protein')*4 / F.col('calories'))
       .withColumn('fat_ratio',F.col('fat')*9 / F.col('calories'))
       )
# 防止新增特征列时除法中出现的NA值,这里做了0填充
foods=foods.fillna(0.0,subset=['protein_ratio','fat_ratio'])
# 更新原有的连续值的数组
continue_columns+=['protein_ratio','fat_ratio']

机器学习部分

  1. null值的插值处理
  2. 特征值极端值处理
  3. 特征值的归一化
  4. 将特征的向量化
  5. 根据输入的特征和判断结果完成逻辑回归
  6. 模型分类效果初步验证
  7. 模型优化与检验

1. null值的插值处理

代码语言:javascript
复制
OLD_COLUMNS=['calories','protein','fat','sodium']
NEW_COLUMNS=['calories_i','protein_i','fat_i','sodium_i']

imputer = Imputer(
    strategy='median',
    inputCols=OLD_COLUMNS,
    outputCols=NEW_COLUMNS
)

2. 特征值极端值处理

这里自定义了一个新的评估器:

大于均值与标准差的2倍绝对值的定义为极端值,并作为极值替换极端值

代码语言:javascript
复制
class _ExtremeValueCapperParam(
    HasInputCol,HasOutputCol,
    HasInputCols,HasOutputCols,
    DefaultParamsWritable,DefaultParamsReadable
    ):
    boundary = Param(
        Params._dummy(),
        'boundary',
        'Multiple of standard deviation for the cap and floor.Default=0.0',
        TypeConverters.toFloat,
    )

    def __init__(self,*args):
        super().__init__(*args)  # 这里显示可能调用的实参为前两个,如果不加DefaultParamsWritable, DefaultParamsReadable警告的话
        self._setDefault(boundary=0.0)

    @keyword_only
    def setParams(self,*,
                  inputCol=None,outputCol=None,
                  inputCols=None,outputCols=None,
                  boundary=0.0,cap_lt=None,floor_lt=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self,input_col):
        return self.setParams(inputCol=input_col)

    def setOutputCol(self,output_col) -> str:
        return self.setParams(inputCol=output_col)

    def setInputCols(self,new_inputCols):
        return self.setParams(inputCols=new_inputCols)

    def setOutputCols(self,new_outputCols):
        return self.setParams(outputCols=new_outputCols)

class ExtremeValueCapperModel(Model,_ExtremeValueCapperParam):
    cap_lt = Param(
        Params._dummy(),
        'cap_lt',
        'upper bound of the values `inputCols` can take.'
        'Values will be capped to those Values.',
        TypeConverters.toListFloat
    )
    floor_lt = Param(
        Params._dummy(),
        'floor_lt',
        'lower bound of the values `inputCols` can take.'
        'Values will be floored to those Values.',
        TypeConverters.toListFloat
    )

    @keyword_only
    def __init__(self,
                 inputCols=None,outputCols=None,
                 cap_lt=None,floor_lt=None):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    def setCap_lt(self,your_cap):
        return self.setParams(cap_lt=your_cap)

    def setFloor_lt(self,your_floor):
        return self.setParams(floor_lt=your_floor)

    def getCap_lt(self):
        return self.getOrDefault(self.cap_lt)

    def getFloor_lt(self):
        return self.getOrDefault(self.floor_lt)

    def _transform(self,dataset):
        if not self.isSet('inputCols'):
            raise ValueError(
                'No input columns set for the '
                'ExtremeValueCapperModel'
            )

        input_columns = dataset[self.getInputCols()]
        output_columns = self.getOutputCols()  # 这里不是dataset[self.getOutputCol()] 而是str类型
        cap_list = self.getOrDefault('cap_lt')
        floor_list = self.getOrDefault('floor_lt')

        for input_col,output_col,cap_value,floor_value in zip(input_columns,output_columns,cap_list,floor_list):
            dataset=dataset.withColumn(
                    output_col,
                    F.when(input_col > cap_value,cap_value)
                        .when(input_col < floor_value,floor_value)
                        .otherwise(input_col)
            )

        return dataset


class ExtremeValueCapper(Estimator,_ExtremeValueCapperParam):
    @keyword_only
    def __init__(self,
                 inputCol=None,outputCol=None,
                 inputCols=None,outputCols=None,
                 boundary=0.0):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setBoundary(self,your_boundary):
        return self.setParams(inputCol=your_boundary)

    def getBoundary(self):
        return self.getOrDefault(self.boundary)

    def _fit(self,dataset):
        input_columns= ([self.getInputCol()] if self.isSet('inputCol') else self.getInputCols())
        output_columns=([self.getOutputCol()] if self.isSet('outputCol') else self.getOutputCols())
        boundary = self.getBoundary()
        cap_list=[]
        floor_list=[]
        for col_in in input_columns:
            avg,std = dataset.agg(F.avg(col_in),F.stddev(col_in)).head()
            cap_value = avg + std * boundary
            floor_value = avg - std * boundary
            cap_list.append(cap_value)
            floor_list.append(floor_value)

        return (
            ExtremeValueCapperModel(
                inputCols=input_columns,
                outputCols=output_columns,
                cap_lt=cap_list,
                floor_lt=floor_list
            )
        )
        
limited_vals=ExtremeValueCapper(inputCols=continue_columns,outputCols=continue_columns,boundary=2)

3 & 4. 特征值的向量化和归一化

连续值的向量化

代码语言:javascript
复制
continue_columns_i = list(set(continue_columns) - set(OLD_COLUMNS)) + NEW_COLUMNS
CONTINUE_NB = [x for x in continue_columns_i if 'ratio' not in x]
continuous_assebler = VectorAssembler(
    inputCols=CONTINUE_NB,
    outputCol='continuous'
)

连续值的归一化

代码语言:javascript
复制
continuous_scaler = MinMaxScaler(
    min=0.0,
    max=1.0,
    inputCol='continuous',
    outputCol='continuous_scaled'
)

对连续值和二分类值的向量化

代码语言:javascript
复制
preml_assembler=VectorAssembler(
    inputCols=binary_columns
    +['continuous_scaled']
    +['protein_ratio','fat_ratio'],
    outputCol='features'
)

5.线性回归部分

代码语言:javascript
复制
lr=LogisticRegression(
    featuresCol='features',
    labelCol='dessert',
    predictionCol='prediction'
)

最后完成上述步骤的流程拼接

代码语言:javascript
复制
foods_pipeline=Pipeline()
foods_pipeline.setStages(
    [
     imputer,
     limited_vals,
     continuous_assebler,continuous_scaler,
     preml_assembler,
     lr
     ]
)

# 这里使用70%数据用作训练集,30%用作测试集,随机种子3
train,test = foods.randomSplit([0.7,0.3],3)
train.cache()

foods_model=foods_pipeline.fit(train)
results=foods_model.transform(test)
results.cache()

6. 模型分类效果初步验证

代码语言:javascript
复制
evaluator=BinaryClassificationEvaluator(
    labelCol='dessert',
    rawPredictionCol='rawPrediction',
    metricName='areaUnderROC'
)
lr_model=foods_model.stages[-1]
metrics=lr_model.evaluate(results.select('dessert','features'))
print(f"Motel precision: {metrics.precisionByLabel[1]}")
print(f"Motel recall: {metrics.recallByLabel[1]}")
areaUnderROC=evaluator.evaluate(results)
print('一般模型下的AUC值')
print(f'Area under ROC = {areaUnderROC}')

plt.figure(figsize=(5,5))
plt.plot([0,1],[0,1],'r--')
plt.plot(
    lr_model.summary.roc.select("FPR").collect(),
    lr_model.summary.roc.select("TPR").collect()
)
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.show()

# -------------结果-------------
Motel precision: 0.9109900090826522
Motel recall: 0.9151459854014599

一般模型下的AUC值
Area under ROC = 0.9907149155115066
初始ROC曲线
初始ROC曲线

7. 模型优化与检验

这里使用K折交叉验证,优化超参数,因为耗时较长,重新配置了spark入口参数

代码语言:javascript
复制
# 集群资源:
# 3个节点
# 12核心
# 32G内存
spark=(
    SparkSession
    .builder
    .master('yarn')
    .appName('KfoldCrossValidModel')
    .config('spark.executor.cores','3')
    .config('spark.executor.instances','3')
    .config('spark.driver.memory','8g')
    .config('spark.executor.memory','6g')
    .getOrCreate()
)

代码语言:javascript
复制
# 向参数网络添加回归直线中L1、L2正则的比例参数
grid_search=(
    ParamGridBuilder()
    .addGrid(lr.elasticNetParam,[0.0,0.25,0.5,0.75,1.0])
    .build()
)

# BinaryClassificationEvaluator作为交叉验证的依据
# 数据分为3份,并设置保存训练过程中产生的模型
cv = CrossValidator(
    estimator=foods_pipeline,
    estimatorParamMaps=grid_search,
    evaluator=evaluator,
    numFolds=3,
    seed=5,
    collectSubModels=True
)

cv_model=cv.fit(foods)
print(cv_model.avgMetrics)

# -------------结果-------------
# 这里的结果代表交叉验证过程中,模型的准确率、加权精确度、加权召回率和加权 F 度量的平均值分
# 在L1、L2正则的比例在0.5的时候最好
[0.991426005564081, 0.9914260072284723, 0.9914287502554829, 0.9914247169694952, 0.991428430694426]

# 保存模型
cv_model.write().overwrite().save('hdfs://mainnode:8020/a1/my_model/dessert_Kfold') 

模型的读取和验证

如果能正常读取截距和各项参数说明模型以训练完成

代码语言:javascript
复制
loaded_model=CrossValidatorModel.load('hdfs://mainnode:8020/a1/my_model/dessert_Kfold')

best_model = loaded_model.bestModel
lr_model = best_model.stages[-1]
results = best_model.transform(foods)

# 使用schema中保存的列名,因为向量化时,二维的数组会变成一维的,名称会变更
# 如continuous_scaled会将内部参数单独列为:continuous_scaled_0~continuous_scaled_n-1
feature_names=['lr_Intercept']+[
    x['name']
    for x in (
        results
        .schema['features']
        .metadata['ml_attr']['attrs']['numeric']
    )
]
feature_coefficients = [lr_model.intercept] + list(
    lr_model.coefficients.values
)
coefficients = pd.DataFrame(
    feature_coefficients,index=feature_names,columns=['coef']
)
print(coefficients)
# -------------结果-------------
                         coef
lr_Intercept         -1.831518
squid                -4.722940
wedding              -1.725413
pasta                -2.910748
triple_sec            0.203789
...                        ...
continuous_scaled_2  24.190459
continuous_scaled_3  -5.736020
continuous_scaled_4  -7.597928
protein_ratio        -7.945574
fat_ratio             0.656431

准确率,召回率的计算

代码语言:javascript
复制
metrics = lr_model.evaluate(results.select('dessert','features'))
print(f"Motel precision: {metrics.precisionByLabel[1]}")
print(f"Motel recall: {metrics.recallByLabel[1]}")

from sklearn.metrics import roc_curve,auc
results.select('title','rawPrediction','probability','dessert').show(20,False)
predictions = results.select('rawPrediction','dessert').collect()
labels = [row.dessert for row in predictions]
scores = [row.rawPrediction[1] for row in predictions]
FPR,TPR,_ = roc_curve(labels,scores)
areaUnderROC = auc(FPR,TPR)
print('交叉验证模型下AUC值')
print(f'Area under ROC = {areaUnderROC}')

# -------------结果-------------
Motel precision: 0.9499578296317122
Motel recall: 0.9457038902882732

可视化部分

代码语言:javascript
复制
from sklearn.metrics import roc_curve,auc
# 因为这里的summary值在模型内部并没有保存,因此需要手动计算FPR TPR以及ROC 
# 这里选取原始预测值rawPrediction作为输入的scores
# dessert列为结果labels
predictions = results.select('rawPrediction','dessert').collect()
labels = [row.dessert for row in predictions]
scores = [row.rawPrediction[1] for row in predictions]
FPR,TPR,_ = roc_curve(labels,scores)
areaUnderROC = auc(FPR,TPR)
print('交叉验证模型下AUC值')
print(f'Area under ROC = {areaUnderROC}')

plt.figure(figsize=(5,5))
plt.plot([0,1],[0,1],'r--')
plt.plot(FPR,TPR)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

# -------------结果-------------
交叉验证模型下AUC值
Area under ROC = 0.9972339842699864

交叉验证下的ROC曲线
交叉验证下的ROC曲线

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档