首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在(Py)Spark管道内处理评分数据中数量较少的类别

在(Py)Spark管道内处理评分数据中数量较少的类别
EN

Stack Overflow用户
提问于 2019-08-01 17:46:29
回答 1查看 81关注 0票数 0

我有两个链接的Spark-ML管道(参见下面的代码)。第一个用于数据准备(string indexing -> imputing -> one-hot-encoding ->向量汇编),第二个用于模型估计(随机森林分类器)。数据争论管道产生目标向量和特征的稀疏向量(其中有270个特征)。

我在训练数据上拟合了两个连续的管道。到目前一切尚好。

接下来,我想在看不见(=评分)数据的评分中使用相同的管道。但是,当我通过数据准备管道运行评分数据时,可能是因为这个新数据集的一些分类变量中的标签数量较少,它生成的列较少(可能是由于一次热编码)。因此,向量汇编器产生一个更窄的稀疏向量(它只有250个特征),它在计分管道中产生和"IndexOutOfBoundsError“(它正在寻找不存在的索引252 )。

您能否建议我如何规避此问题的一般策略/最佳实践?

出于某种原因,我认为当您在拟合的管道中使用'.transform‘操作时,它将保留维度(只需用0填充非当前列),但显然并非如此。

A)使用训练'df‘安装管道

代码语言:javascript
运行
复制
stages_string_indexing = [StringIndexer(inputCol= c, outputCol= c+"_strIndex", handleInvalid = "keep") for c in variable_types['categorical']]

categorical_str = [c+"_strIndex" for c in variable_types['categorical']]
stage_imputer = Imputer(inputCols = categorical_str, outputCols = categorical_str, missingValue = -999)

stage_one_hot_encoding = [OneHotEncoder(inputCol= c, outputCol= c+ "_ohe") for c in categorical_str]

stage_vector_assembler_cat = VectorAssembler(inputCols = [c+ "_ohe" for c in categorical_str], outputCol= "features_cat")

pipeline_dataPrep_cat = Pipeline(stages= stages_string_indexing + [stage_imputer] + stage_one_hot_encoding + [stage_vector_assembler_cat])

stage_imputer_q = Imputer(inputCols = variable_types['quantitative'], outputCols = variable_types['quantitative'], strategy = "median")

stage_vector_assembler_q = VectorAssembler(inputCols = variable_types['quantitative'], outputCol= "features_q")

pipeline_dataPrep_q = Pipeline(stages= [stage_imputer_q, stage_vector_assembler_q])

stage_vector_assembler_all = VectorAssembler(inputCols = ['features_q', 'features_cat'], outputCol= "FEATURES")

pipeline_dataPrep = Pipeline(stages= [pipeline_dataPrep_cat,pipeline_dataPrep_q, stage_vector_assembler_all])

pipelineModel_dataPrep = pipeline_dataPrep.fit(df)
df = pipelineModel_dataPrep.transform(df)

modelling_data = df.select("TARGET", "FEATURES")
modelling_data.show()


stage_rf_classifier = RandomForestClassifier(labelCol = "TARGET", featuresCol = "FEATURES")

pipeline_rf = Pipeline(stages = [stage_rf_classifier])

# DEFINE CROSS-VALIDATION experiment
paramGrid = ParamGridBuilder() \
  .addGrid(stage_rf_classifier.numTrees, [20, 100]) \
  .addGrid(stage_rf_classifier.maxDepth, [2, 10]) \
  .build()

cross_validator_rf = CrossValidator(estimator = pipeline_rf
                          , estimatorParamMaps=paramGrid
                          , evaluator = BCE(labelCol = "TARGET"
                                            , rawPredictionCol = "probability"
                                            , metricName = "areaUnderROC")
                          , numFolds= 2)

# clock the model fitting:
print('Model fitting started at: '+ str(datetime.datetime.now()))
cv_rf = cross_validator_rf.fit(modelling_data_sample)
print('Model fitting fnished at: '+ str(datetime.datetime.now()))
EN

回答 1

Stack Overflow用户

发布于 2019-08-01 20:42:29

我已经尝试了在一个简单的用例中描述的东西,但我无法复制上面描述的问题(所以我现在真的很困惑)。

下面是我的例子:

代码语言:javascript
运行
复制
# ================================================
# TRAIN:
# ================================================

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline, Transformer

df = spark.createDataFrame(
    [(0, 1,"a", 1.2)
     , (1, 0, "b", 0.8)
     , (2, 1, "c", 7.3)
     , (3, 0,"a", 6.25)
     , (4, 0, "a", 4.53)
     , (5, 0, "c", 1.45)],
    ["id", "target","category","number"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex", handleInvalid = "keep")
ohe = OneHotEncoder(inputCol= "categoryIndex", outputCol= "categoryIndex_ohe")
vas = VectorAssembler(inputCols = ["id","categoryIndex_ohe","number"], outputCol= "features")

pipeline_dataPrep = Pipeline(stages= [indexer, ohe, vas])

pipelineTrans = pipeline_dataPrep.fit(df)
df_trans = pipeline_dataPrep.fit(df).transform(df).select('id','target','features')
df_trans.show()

from pyspark.ml.classification import RandomForestClassifier

# fit model:
rf = RandomForestClassifier(labelCol="target", featuresCol="features", numTrees=10)
pipeline_rf = Pipeline(stages= [rf])

pipelineModel = pipeline_rf.fit(df_trans)

# ================================================
# SCORE:
# ================================================

df_score = spark.createDataFrame(
    [(0, "a", 0.3)
     , (1, "a", 2.8)
     , (2, "a", 1.3)
     , (3,"a", 1.25)
     , (4, "d", 3.53)
     , (5, "a", 1.45)],
    ["id","category","number"])

df_score_trans = pipelineTrans.transform(df_score)
df_score_trans.select('id','features').show()

df_rf = pipelineModel.transform(df_score_trans)
df_rf.show()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57306245

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档