前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >pyspark-ml学习笔记:pyspark下使用xgboost进行分布式训练

pyspark-ml学习笔记:pyspark下使用xgboost进行分布式训练

作者头像
MachineLP
发布于 2019-08-29 03:39:52
发布于 2019-08-29 03:39:52
6K00
代码可运行
举报
文章被收录于专栏:小鹏的专栏小鹏的专栏
运行总次数:0
代码可运行

问题是这样的,如果我们想基于pyspark开发一个分布式机器训练平台,而xgboost是不可或缺的模型,但是pyspark ml中没有对应的API,这时候我们需要想办法解决它。

测试代码: ( (pyspark使用可以参考这个:https://cloud.tencent.com/developer/article/1436179 ))

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#!/usr/bin/env python
# -*- coding:utf8 -*-

"""
------------------------------------------------- 
   Description :  pyspark测试 
   Author :       liupeng 
   Date :         2019/7/23 
------------------------------------------------- 

"""

import os 
import sys 
import time 
import pandas as pd 
import numpy as np 
from start_pyspark import spark, sc, sqlContext
import pyspark.sql.types as typ
import pyspark.ml.feature as ft
from pyspark.sql.functions import isnan, isnull


import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell'
# import findspark
# findspark.init()

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# spark.sparkContext.addPyFile("hdfs:///tmp/rd/lp/sparkxgb.zip") 
from sparkxgb import XGBoostEstimator


schema = StructType(
  [StructField("PassengerId", DoubleType()),
    StructField("Survival", DoubleType()),
    StructField("Pclass", DoubleType()),
    StructField("Name", StringType()),
    StructField("Sex", StringType()),
    StructField("Age", DoubleType()),
    StructField("SibSp", DoubleType()),
    StructField("Parch", DoubleType()),
    StructField("Ticket", StringType()),
    StructField("Fare", DoubleType()),
    StructField("Cabin", StringType()),
    StructField("Embarked", StringType())
  ])


df_raw = spark\
  .read\
  .option("header", "true")\
  .schema(schema)\
  .csv("hdfs:///tmp/rd/lp/titanic/train.csv")

df_raw.show(2)
df = df_raw.na.fill(0)

sexIndexer = StringIndexer()\
  .setInputCol("Sex")\
  .setOutputCol("SexIndex")\
  .setHandleInvalid("keep")
    
cabinIndexer = StringIndexer()\
  .setInputCol("Cabin")\
  .setOutputCol("CabinIndex")\
  .setHandleInvalid("keep")
    
embarkedIndexer = StringIndexer()\
  .setInputCol("Embarked")\
  .setHandleInvalid("keep")

# .setOutputCol("EmbarkedIndex")\

vectorAssembler = VectorAssembler()\
  .setInputCols(["Pclass", "Age", "SibSp", "Parch", "Fare"])\
  .setOutputCol("features")

xgboost = XGBoostEstimator( featuresCol="features", labelCol="Survival", predictionCol="prediction")

# pipeline = Pipeline().setStages([sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgboost])
pipeline = Pipeline(stages=[
      vectorAssembler,
      xgboost
      ])


trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
trainDF.show(2)


model = pipeline.fit(trainDF)

print (88888888888888888888)


model.transform(testDF).select(col("PassengerId"), col("prediction")).show()

print (9999999999999999999)


'''
# Define and train model
xgboost = XGBoostEstimator(
    # General Params
    nworkers=1, nthread=1, checkpointInterval=-1, checkpoint_path="",
    use_external_memory=False, silent=0, missing=float("nan"),
    
    # Column Params
    featuresCol="features", labelCol="label", predictionCol="prediction", 
    weightCol="weight", baseMarginCol="baseMargin", 
    
    # Booster Params
    booster="gbtree", base_score=0.5, objective="binary:logistic", eval_metric="error", 
    num_class=2, num_round=2, seed=None,
    
    # Tree Booster Params
    eta=0.3, gamma=0.0, max_depth=6, min_child_weight=1.0, max_delta_step=0.0, subsample=1.0,
    colsample_bytree=1.0, colsample_bylevel=1.0, reg_lambda=0.0, alpha=0.0, tree_method="auto",
    sketch_eps=0.03, scale_pos_weight=1.0, grow_policy='depthwise', max_bin=256,
    
    # Dart Booster Params
    sample_type="uniform", normalize_type="tree", rate_drop=0.0, skip_drop=0.0,
    
    # Linear Booster Params
    lambda_bias=0.0
)
'''

'''
xgboost_model = xgboost.fit(trainDF)

# Transform test set
xgboost_model.transform(testDF).show()

# Write model/classifier
xgboost.write().overwrite().save("xgboost_class_test")
xgboost_model.write().overwrite().save("xgboost_class_test.model")
'''

start_pyspark.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#!/usr/bin/env python
# -*- coding:utf8 -*-

"""
-------------------------------------------------
   Description :  模型预测接口
   Author :       liupeng
   Date :         2019/7/23
-------------------------------------------------

"""

import os
import sys

''' 
#下面这些目录都是你自己机器的Spark安装目录和Java安装目录
os.environ['SPARK_HOME'] = "/Users/***/spark-2.4.3-bin-hadoop2.7/" 

sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/bin") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/pyspark") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/lib/py4j-0.9-src.zip") 
# sys.path.append("/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home") 
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home" 
'''

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext 
#conf = SparkConf().setMaster("local").setAppName("My App") 
conf = SparkConf().setMaster("yarn").setAppName("My App") 
sc = SparkContext(conf = conf) 
spark = SparkSession.builder.appName('CalculatingGeoDistances').getOrCreate() 
sqlContext = SQLContext(sparkContext=sc) 

集群提交测试:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
nohup /di_software/emr-package/spark-2.4.3-bin-hadoop2.7/bin/spark-submit --master yarn --jars /home/di/liupeng/qdxgboost/xgboost4j-0.72.jar,/home/di/liupeng/qdxgboost/xgboost4j-spark-0.72.jar /home/di/liupeng/qdxgboost/test_xgboost.py > output_spark.log 2>&1 &

主要参考:pyspark xgboost: https://towardsdatascience.com/pyspark-and-xgboost-integration-tested-on-the-kaggle-titanic-dataset-4e75a568bdb ( 需要 spark2.3之后的版本 )

非网格搜索模式下加载和保存模型:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from sparkxgb import XGBoostEstimator, XGBoostClassificationModel
feature_path = '/tmp/rd/lp/model27'
model_path = '/tmp/rd/lp/model28'

xgboost = XGBoostEstimator(featuresCol="features", labelCol="Survival", predictionCol="prediction",
                                   num_round=100)

pipeline = Pipeline(stages=[vectorAssembler])
s_model = pipeline.fit(trainDF)
train_data = s_model.transform( trainDF )
s_model.write().overwrite().save(feature_path)

xgb_model = xgboost.fit( train_data )
xgb_model.write().overwrite().save(model_path)


pipeline = PipelineModel.load(feature_path)
train_data = s_model.transform( trainDF )
model = XGBoostClassificationModel.load( model_path )
res = model.transform( train_data )
print ( 'res:', res.collect() )
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年08月13日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
scala-sparkML学习笔记:xgboost进行分布式训练
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
MachineLP
2019/08/31
4.5K0
pyspark-ml学习笔记:LogisticRegression
数据可以查看github:https://github.com/MachineLP/Spark-/tree/master/pyspark-ml
MachineLP
2019/08/01
1.9K0
pyspark-ml学习笔记:如何在pyspark ml管道中添加自己的函数作为custom stage?
问题是这样的,有时候spark ml pipeline中的函数不够用,或者是我们自己定义的一些数据预处理的函数,这时候应该怎么扩展呢? 扩展后保持和pipeline相同的节奏,可以保存加载然后transform。
MachineLP
2019/08/29
3.4K0
深入理解XGBoost:分布式实现
本文将重点介绍XGBoost基于Spark平台Scala版本的实现,带领大家逐步完成特征提取、变换和选择、XGBoost模型训练、Pipelines、模型选择。
Coggle数据科学
2020/03/17
4.4K0
深入理解XGBoost:分布式实现
如何使用Apache Spark MLlib预测电信客户流失
Spark MLLib是一个用于在海量数据集上执行机器学习和相关任务的库。使用MLlib,可以对十亿个观测值进行机器学习模型的拟合,可能只需要几行代码并利用数百台机器就能达到。MLlib大大简化了模型开发过程。
用户1415462
2018/05/30
4.2K0
在PySpark上使用XGBoost
我这里提供一个pyspark的版本,参考了大家公开的版本。同时因为官网没有查看特征重要性的方法,所以自己写了一个方法。本方法没有保存模型,相信大家应该会。
Sam Gor
2021/04/26
5.2K1
pyspark-ml学习笔记:模型评估
问题是这样的,如果我们想基于pyspark开发一个分布式机器训练平台,那么肯定需要对模型进行评估,而pyspark本身自带模型评估的api很少,想进行扩展的话有几种方案:
MachineLP
2019/08/29
1.3K0
【干货】Python大数据处理库PySpark实战——使用PySpark处理文本多分类问题
【导读】近日,多伦多数据科学家Susan Li发表一篇博文,讲解利用PySpark处理文本多分类问题的详情。我们知道,Apache Spark在处理实时数据方面的能力非常出色,目前也在工业界广泛使用。
WZEARW
2018/04/13
26.3K1
【干货】Python大数据处理库PySpark实战——使用PySpark处理文本多分类问题
基于Apache Spark机器学习的客户流失预测
流失预测是个重要的业务,通过预测哪些客户可能取消对服务的订阅来最大限度地减少客户流失。虽然最初在电信行业使用,但它已经成为银行,互联网服务提供商,保险公司和其他垂直行业的通用业务。
AlexanderTan
2018/02/07
3.5K0
基于Apache Spark机器学习的客户流失预测
spark杂记:movie recommendation using ALS
版权声明:本文为博主原创文章,未经博主允许不得转载。有问题可以加微信:lp9628(注明CSDN)。 https://blog.csdn.net/u014365862/article/details/88982729
MachineLP
2019/05/26
9800
PySpark 读写 JSON 文件到 DataFrame
PySpark SQL 提供 read.json("path") 将单行或多行(多行)JSON 文件读取到 PySpark DataFrame 并 write.json("path") 保存或写入 JSON 文件的功能,在本教程中,您将学习如何读取单个文件、多个文件、目录中的所有文件进入 DataFrame 并使用 Python 示例将 DataFrame 写回 JSON 文件。
数据STUDIO
2023/09/04
1.5K0
PySpark 读写 JSON 文件到 DataFrame
Spark MLlib特征处理 之 StringIndexer、IndexToString使用说明以及源码剖析
StringIndexer可以把字符串的列按照出现频率进行排序,出现次数最高的对应的Index为0。比如下面的列表进行StringIndexer
用户1154259
2018/07/31
2.8K0
spark杂记:Spark Basics
Spark 学习笔记可以follow这里:https://github.com/MachineLP/Spark-
MachineLP
2019/05/26
9490
PySpark|ML(评估器)
在PySpark中包含了两种机器学习相关的包:MLlib和ML,二者的主要区别在于MLlib包的操作是基于RDD的,ML包的操作是基于DataFrame的。根据之前我们叙述过的DataFrame的性能要远远好于RDD,并且MLlib已经不再被维护了,所以在本专栏中我们将不会讲解MLlib。
数据山谷
2020/11/24
1.6K0
PySpark|ML(评估器)
pyspark-ml学习笔记:逻辑回归、GBDT、xgboost参数介绍
逻辑回归、GBDT可以参考pyspark开发文档:http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression 。
MachineLP
2019/08/29
3.4K0
探索MLlib机器学习
实用工具:线性代数,统计,数据处理等工具 特征工程:特征提取,特征转换,特征选择 常用算法:分类,回归,聚类,协同过滤,降维 模型优化:模型评估,参数优化。
lyhue1991
2021/01/26
4.2K0
PySpark 中的机器学习库
传统的机器学习算法,由于技术和单机存储的限制,比如使用scikit-learn,只能在少量数据上使用。即以前的统计/机器学习依赖于数据抽样。但实际过程中样本往往很难做好随机,导致学习的模型不是很准确,在测试数据上的效果也可能不太好。随着 HDFS(Hadoop Distributed File System) 等分布式文件系统出现,存储海量数据已经成为可能。在全量数据上进行机器学习也成为了可能,这顺便也解决了统计随机性的问题。然而,由于 MapReduce 自身的限制,使得使用 MapReduce 来实现分布式机器学习算法非常耗时和消耗磁盘IO。因为通常情况下机器学习算法参数学习的过程都是迭代计算的,即本次计算的结果要作为下一次迭代的输入,这个过程中,如果使用 MapReduce,我们只能把中间结果存储磁盘,然后在下一次计算的时候从新读取,这对于迭代频发的算法显然是致命的性能瓶颈。引用官网一句话:Apache Spark™ is a unified analytics engine for large-scale data processing.Spark, 是一种"One Stack to rule them all"的大数据计算框架,期望使用一个技术堆栈就完美地解决大数据领域的各种计算任务.
MeteoAI
2019/07/24
3.5K0
PySpark数据类型转换异常分析
在使用PySpark的SparkSQL读取HDFS的文本文件创建DataFrame时,在做数据类型转换时会出现一些异常,如下:
Fayson
2018/03/29
5.3K0
PySpark数据类型转换异常分析
分布式机器学习原理及实战(Pyspark)
大数据(Big Data)是指无法在一定时间内用常规软件工具对其内容进行抓取、管理和处理的数据集合。大数据技术,是指从各种各样类型的数据中,快速获得有价值信息的能力。
算法进阶
2022/06/01
5.2K0
分布式机器学习原理及实战(Pyspark)
大数据【企业级360°全方位用户画像】基于USG模型的挖掘型标签开发
在上一篇博客,博主已经为大家简单地介绍了USG模型和决策树?《大数据【企业级360°全方位用户画像】之USG模型和决策树分类算法》。本篇博客,我们需要利用决策树算法,对用户画像中,处于USG模型下的用
大数据梦想家
2021/01/27
5550
大数据【企业级360°全方位用户画像】基于USG模型的挖掘型标签开发
相关推荐
scala-sparkML学习笔记:xgboost进行分布式训练
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验