我有两个链接的Spark-ML管道(参见下面的代码)。第一个用于数据准备(string indexing -> imputing -> one-hot-encoding ->向量汇编),第二个用于模型估计(随机森林分类器)。数据争论管道产生目标向量和特征的稀疏向量(其中有270个特征)。
我在训练数据上拟合了两个连续的管道。到目前一切尚好。
接下来,我想在看不见(=评分)数据的评分中使用相同的管道。但是,当我通过数据准备管道运行评分数据时,可能是因为这个新数据集的一些分类变量中的标签数量较少,它生成的列较少(可能是由于一次热编码)。因此,向量汇编器产生一个更窄的稀疏向量(它只有250个特征),它在计分管道中产生和"IndexOutOfBoundsError“(它正在寻找不存在的索引252 )。
您能否建议我如何规避此问题的一般策略/最佳实践?
出于某种原因,我认为当您在拟合的管道中使用'.transform‘操作时,它将保留维度(只需用0填充非当前列),但显然并非如此。
A)使用训练'df‘安装管道
stages_string_indexing = [StringIndexer(inputCol= c, outputCol= c+"_strIndex", handleInvalid = "keep") for c in variable_types['categorical']]
categorical_str = [c+"_strIndex" for c in variable_types['categorical']]
stage_imputer = Imputer(inputCols = categorical_str, outputCols = categorical_str, missingValue = -999)
stage_one_hot_encoding = [OneHotEncoder(inputCol= c, outputCol= c+ "_ohe") for c in categorical_str]
stage_vector_assembler_cat = VectorAssembler(inputCols = [c+ "_ohe" for c in categorical_str], outputCol= "features_cat")
pipeline_dataPrep_cat = Pipeline(stages= stages_string_indexing + [stage_imputer] + stage_one_hot_encoding + [stage_vector_assembler_cat])
stage_imputer_q = Imputer(inputCols = variable_types['quantitative'], outputCols = variable_types['quantitative'], strategy = "median")
stage_vector_assembler_q = VectorAssembler(inputCols = variable_types['quantitative'], outputCol= "features_q")
pipeline_dataPrep_q = Pipeline(stages= [stage_imputer_q, stage_vector_assembler_q])
stage_vector_assembler_all = VectorAssembler(inputCols = ['features_q', 'features_cat'], outputCol= "FEATURES")
pipeline_dataPrep = Pipeline(stages= [pipeline_dataPrep_cat,pipeline_dataPrep_q, stage_vector_assembler_all])
pipelineModel_dataPrep = pipeline_dataPrep.fit(df)
df = pipelineModel_dataPrep.transform(df)
modelling_data = df.select("TARGET", "FEATURES")
modelling_data.show()
stage_rf_classifier = RandomForestClassifier(labelCol = "TARGET", featuresCol = "FEATURES")
pipeline_rf = Pipeline(stages = [stage_rf_classifier])
# DEFINE CROSS-VALIDATION experiment
paramGrid = ParamGridBuilder() \
.addGrid(stage_rf_classifier.numTrees, [20, 100]) \
.addGrid(stage_rf_classifier.maxDepth, [2, 10]) \
.build()
cross_validator_rf = CrossValidator(estimator = pipeline_rf
, estimatorParamMaps=paramGrid
, evaluator = BCE(labelCol = "TARGET"
, rawPredictionCol = "probability"
, metricName = "areaUnderROC")
, numFolds= 2)
# clock the model fitting:
print('Model fitting started at: '+ str(datetime.datetime.now()))
cv_rf = cross_validator_rf.fit(modelling_data_sample)
print('Model fitting fnished at: '+ str(datetime.datetime.now()))
发布于 2019-08-01 20:42:29
我已经尝试了在一个简单的用例中描述的东西,但我无法复制上面描述的问题(所以我现在真的很困惑)。
下面是我的例子:
# ================================================
# TRAIN:
# ================================================
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline, Transformer
df = spark.createDataFrame(
[(0, 1,"a", 1.2)
, (1, 0, "b", 0.8)
, (2, 1, "c", 7.3)
, (3, 0,"a", 6.25)
, (4, 0, "a", 4.53)
, (5, 0, "c", 1.45)],
["id", "target","category","number"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex", handleInvalid = "keep")
ohe = OneHotEncoder(inputCol= "categoryIndex", outputCol= "categoryIndex_ohe")
vas = VectorAssembler(inputCols = ["id","categoryIndex_ohe","number"], outputCol= "features")
pipeline_dataPrep = Pipeline(stages= [indexer, ohe, vas])
pipelineTrans = pipeline_dataPrep.fit(df)
df_trans = pipeline_dataPrep.fit(df).transform(df).select('id','target','features')
df_trans.show()
from pyspark.ml.classification import RandomForestClassifier
# fit model:
rf = RandomForestClassifier(labelCol="target", featuresCol="features", numTrees=10)
pipeline_rf = Pipeline(stages= [rf])
pipelineModel = pipeline_rf.fit(df_trans)
# ================================================
# SCORE:
# ================================================
df_score = spark.createDataFrame(
[(0, "a", 0.3)
, (1, "a", 2.8)
, (2, "a", 1.3)
, (3,"a", 1.25)
, (4, "d", 3.53)
, (5, "a", 1.45)],
["id","category","number"])
df_score_trans = pipelineTrans.transform(df_score)
df_score_trans.select('id','features').show()
df_rf = pipelineModel.transform(df_score_trans)
df_rf.show()
https://stackoverflow.com/questions/57306245
复制相似问题