使用Spark/Scala将JSON字符串格式化为MongoDB文档样式的方法如下:
import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("JSON to MongoDB")
.config("spark.mongodb.output.uri", "mongodb://localhost/mydb.collection")
.getOrCreate()
其中,mongodb://localhost/mydb.collection
是MongoDB的连接URI,mydb.collection
是要写入的数据库和集合名称。
val jsonString = """{"name": "John", "age": 30, "city": "New York"}"""
val jsonDF = spark.read.json(Seq(jsonString).toDS())
val formattedDF = jsonDF.withColumn("_id", monotonically_increasing_id())
这里使用monotonically_increasing_id()
函数为每个文档生成唯一的_id
字段。
formattedDF.write
.format("com.mongodb.spark.sql.DefaultSource")
.mode(SaveMode.Append)
.save()
完整的代码示例:
import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("JSON to MongoDB")
.config("spark.mongodb.output.uri", "mongodb://localhost/mydb.collection")
.getOrCreate()
val jsonString = """{"name": "John", "age": 30, "city": "New York"}"""
val jsonDF = spark.read.json(Seq(jsonString).toDS())
val formattedDF = jsonDF.withColumn("_id", monotonically_increasing_id())
formattedDF.write
.format("com.mongodb.spark.sql.DefaultSource")
.mode(SaveMode.Append)
.save()
注意:在运行代码之前,需要确保已经正确配置了Spark和MongoDB的环境,并且已经添加了相关的依赖。
推荐的腾讯云相关产品:腾讯云数据库 MongoDB,产品介绍链接地址:https://cloud.tencent.com/product/mongodb
领取专属 10元无门槛券
手把手带您无忧上云