前些时候和后台对接,需要用pyspark获取MongoDB、MySQL数据,本文将介绍如何使用PySpark与MongoDB、MySQL进行数据交互。MongoDB是一个基于分布式文件存储的数据库,由C++语言编写。它旨在为Web应用提供可扩展的高性能数据存储解决方案。
pip install pyspark
命令安装下面是一个简单的PySpark脚本,用于从MongoDB中读取数据:
#!/usr/bin/python3
# coding=utf-8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("MongoSparkConnectorIntro") \
.config("spark.mongodb.input.uri", "mongodb://username:password@host1:port,host2:port/dbName.collectionName?authSource=admin") \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.4") \
.enableHiveSupport() \
.getOrCreate()
# 读取mongodb中的数据
df = spark.read \
.format("com.mongodb.spark.sql.DefaultSource") \
.load()
# 打印数据
df.show()
spark.stop()
在这个脚本中需要注意根据实际情况修改URI中的用户名、密码、主机、端口、数据库名和集合名。最后使用spark.read.format().load()
方法从MongoDB中读取数据,并将其存储在DataFrame中。
#!/usr/bin/python3
# coding=utf-8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("PySparkMySQLConnectorIntro") \
.config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar") \
.getOrCreate()
# 读取MySQL中的数据
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://hostname:port/dbname") \
.option("dbtable", "tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
# 打印数据
df.show()
spark.stop()
MySQL与MongoDB类似,故不赘述。
在使用此脚本时,需要注意以下几点:
spark.jars.packages
设置,指定MongoDB Spark Connector的版本。注意,最后的2.11
是Scala版本,通常不需要更改;2.4.4
是Spark版本,需要根据实际使用的Spark版本进行修改。authSource=admin
参数。具体示例请参见2.1代码中的第12行。(MongoDB常用的查询语句可以参考):
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。