首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从火花错误插入到CosmosDB

从火花错误插入到CosmosDB
EN

Stack Overflow用户
提问于 2018-04-05 09:50:21
回答 3查看 2.7K关注 0票数 4

我对Spark/CosmosDB/Python非常陌生,所以我正在研究MS站点和GitHub的代码示例,同时尝试自己创建一些东西。经过与Spark连接器的长期斗争,我能够从CosmosDB收集中读取数据.现在,我想做相反的(插入),但发现了另一个障碍。下面是我要介绍的示例:写信给宇宙数据库部分

我能够从宇宙阅读,并做一些数据,但我不能插入回宇宙。下面是我稍微修改过的代码:

代码语言:javascript
运行
复制
%%configure
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

# Read Configuration
readConfig = {
  "Endpoint" : "https://doctorwho.documents.azure.com:443/",
  "Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
  "Database" : "DepartureDelays",
  "preferredRegions" : "Central US;East US2",
  "Collection" : "flights_pcoll", 
  "SamplingRatio" : "1.0",
  "schema_samplesize" : "1000",
  "query_pagesize" : "2147483647",
  "query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

# Write configuration
writeConfig = {
 "Endpoint" : "https://doctorwho.documents.azure.com:443/",
 "Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
 "Database" : "DepartureDelays",
 "Collection" : "flights_pcoll",
 "Upsert" : "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

所以,当我尝试运行这个程序时,我得到:

代码语言:javascript
运行
复制
An error occurred while calling o90.save.
: java.lang.UnsupportedOperationException: Writing in a non-empty collection.

快速搜索后,我尝试将模式(“追加”)添加到我的最后一行:

代码语言:javascript
运行
复制
flights.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

不幸的是,这给我留下了一个我无法理解的错误:

代码语言:javascript
运行
复制
An error occurred while calling o127.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 4 times, most recent failure: Lost task 2.3 in stage 4.0 (TID 90, wn2-MDMstr.zxmmgisclg5udfemnv0v3qva3e.ax.internal.cloudapp.net, executor 2): java.lang.NoClassDefFoundError: com/microsoft/azure/documentdb/bulkexecutor/DocumentBulkExecutor

下面是完整的堆栈跟踪:巴斯托箱错误

有人能帮我解决这个错误吗?在使用我自己的cosmosDB时,我也收到了完全相同的错误,而不是文档中的示例错误。

我用的是带PySpark3内核的木星笔记本。星火版本2.2,HDInsight集群3.6。

编辑--我不想坐等回复,所以我在Scala上尝试了同样的方法。你猜怎么着?相同的错误(或至少非常相似):Scala误差

下面是我的Scala代码:

代码语言:javascript
运行
复制
%%configure
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

val readConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/",
  "Masterkey" -> "$my_key",
  "Database" -> "test",
  "PreferredRegions" -> "West Europe",
  "Collection" -> "$my_collection", 
  "SamplingRatio" -> "1.0"
))
val docs = spark.read.cosmosDB(readConfig)

docs.show()

val writeConfig = Config(Map(
  "Endpoint" -> "https://$my_cosmos_db.documents.azure.com:443/",
  "Masterkey" -> "$my_key",
  "Database" -> "test",
  "PreferredRegions" -> "West Europe",
  "Collection" -> "$my_collection", 
  "WritingBatchSize" -> "100"
))




val someData = Seq(
    Row(8, "bat"),
    Row(64, "mouse"),
    Row(-27, "test_name")
)

val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("name", StringType, true)
)

val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)

someDF.show()

someDF.write.mode(SaveMode.Append).cosmosDB(writeConfig)

也许这将有助于故障排除。

谢谢!

EN

回答 3

Stack Overflow用户

发布于 2018-04-06 04:54:53

对于使用python时的第一个问题,请注意您使用的是doctorwho Azure Cosmos DB集合。这是一个演示集合,在这里我们提供了只读键,但没有写键。因此,您正在接收的错误是缺乏对集合的写访问权限。

对于第二个问题,来自pastebin的错误看起来是一样的。说到这里,一些快速的观察:

  • 您是否使用HDI 3.6,如果您是在Spark2.1上,而使用的罐子是用于Spark2.2。如果您使用HDI 3.7,那么它在Spark2.2上,然后使用正确的jar。
  • 您可能需要使用maven坐标来获取JAR的最新版本。有关更多信息,请注意azure-cosmosdb-spark > 使用木星笔记本
票数 3
EN

Stack Overflow用户

发布于 2018-05-18 11:55:18

经过与微软工程师的沟通和我自己进行的几次测试后,我发现Spark连接器存在一些问题。简单地说,最好的连接器版本是1.0.0,日期为15-11-2017年11月(Spark2.1和2.2)。下面的链接到存储库是一些对我有用的解决方案/解决方案。你可以试着用它们来为你找到最好的解决方案。

1)如果您使用Spark2.1或2.2,请使用1.0.0版中的连接器(上面的链接)。在我撰写此答案时(18-5-2018年),连接器的最新版本为1.1.1,日期为23-2018年3月-2018年3月--当需要将数据帧写入Cosmos DB或试图计数从Cosmos读取的超过50k的文档数据帧(无SQL db的50k文档是什么?)。

2)如果使用Spark2.1,->木星将与1.0.0连接器一起工作。如果你使用星火2.2 ->不要使用木星笔记本-它有一些问题,使用外部软件包,特别是在星火2.2安装。请使用齐柏林飞艇笔记本代替(与1.0.0连接器)。一旦你打开齐柏林飞艇,在右上角点击用户,然后是解释器.转到Livy解释器设置,单击“编辑并添加包坐标:com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.0.0

保存并重新启动解释器。然后使用livy2解释器创建一个新的笔记本。请注意,在齐柏林飞艇的每个单元格中,您必须在第一行中添加%pyspark魔术命令。运行第一个单元格将持续1-2分钟,因为启动整个应用程序。

3)您可以直接使用群集,而不是使用笔记本。在创建集群时,使用sshuser和密码,使用putty到您的集群中:

然后启动pyspark,附加uber-jar文件(您必须从存储库下载uber-jar文件,然后将其上传到连接到集群的blob存储。在我的案例中,文件位于名为示例的文件夹中(容器根目录中的第一级文件)。这里我还使用了1.0.0连接器。以下是命令:

代码语言:javascript
运行
复制
pyspark --master yarn --jars wasb:///example/azure-cosmosdb-spark_2.2.0_2.11-1.0.0-uber.jar

当火花准备好后,你可以粘贴和运行你的命令,一切都应该正常工作。

如果您有什么问题或有什么不清楚的地方,请告诉我。

票数 2
EN

Stack Overflow用户

发布于 2018-05-18 12:03:15

由于我找不到合适的解决方案,所以我想分享一下我的工作配置。我的配置用于HDI 3.6和Spark2.1。使用朱庇特笔记本的PySpark脚本成功地读写了宇宙文档DB中的数据。

代码语言:javascript
运行
复制
%%configure
{
 "name":"Spark-to-Cosmos_DB_Connector", 
 "jars": ["wasb:///cosmos-libs/azure-cosmosdb-spark_2.1.0_2.11-1.0.0-uber.jar"],
 "conf": {"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"}
}

读写信任、读取和保存命令与问题中所描述的完全相同。Write配置具有WritingBatchSize描述的附加参数这里。优步jar我从这个位置下载的。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49669292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档