问:如何高效地将Spark DataFrame中包含值的列写入Cassanrda?(在最小的Scala代码行中高效,而不是在Cassandra中创建一堆tombstones,让它快速运行,等等)
我有一个包含两个键列和300个潜在描述符值的Cassandra表。
create table sample {
key1 text,
key2 text,
0 text,
............
299 text,
PRIMARY KEY (key1, key2)
}
我有一个与底层表匹配的Spark数据帧,但数据帧中的每一行都非常稀疏-除了两个键值之外,特定行可能只有4到5个“描述符”(列0->299)具有一个值。
我目前正在将Spark dataframe转换为RDD,并使用saveRdd来写入数据。
这是可行的,但是当没有值时,"null“被存储在列中。
例如:
val saveRdd = sample.rdd
saveRdd.map(line => (
line(0), line(1), line(2),
line(3), line(4), line(5),
line(6), line(7), line(8),
line(9), line(10), line(11),
line(12), line(13), line(14),
line(15), line(16), line(17),
line(18), line(19), line(20))).saveToCassandra..........
在Cassandra中创建以下代码:
XYZ | 10 | 49849 |F|| null || null | TO11142017_Import | null | 20 | null | nullnull |空null | Scott Dick-Peddie | null | null | null| null | null | null |空null |2014.7.13 0:00 | null |0| null || null |8| null || nullnull |位置| null |位置| null | null
在SparkSession上设置spark.cassandra.output.ignoreNulls不起作用:
spark.conf.set("spark.cassandra.output.ignoreNulls", "true")
spark.conf.get("spark.cassandra.output.ignoreNulls")
这也不起作用:
spark-shell --conf spark.cassandra.output.ignoreNulls=true
(尝试了不同的设置方式,但我设置的方式似乎都不起作用)
withColumn
和filter似乎不是合适的解决方案。未设置的概念可能是正确的,但不确定在这种情况下如何使用它。
cassandra.3.11.2
spark-cassandra-连接器:2.3.0-s_2.11
火花2.2.0.2.6.3.0-235
谢谢!
发布于 2018-11-06 16:05:50
您确定ignoreNulls
不适用于您吗?当给定的单元格中没有值时,Cassandra输出null
。您可以使用sstabledump
工具检查数据是否真的写入了SSTable -您肯定会看到附加了删除信息的单元格(这就是nulls的存储方式)。
下面是不使用ignoreNulls
(默认)并将ignoreNulls
设置为true
的情况下运行Spark的示例。测试是在DSE 5.1.11上完成的,它有旧版本的连接器,但与Cassandra 3.11匹配。
让我们创建一个测试表,如下所示:
create table test.t3 (id int primary key, t1 text, t2 text, t3 text);
在没有ignoreNulls
的情况下,我们需要以下代码进行测试:
case class T3(id: Int, t1: Option[String], t2: Option[String], t3: Option[String])
val rdd = sc.parallelize(Seq(new T3(1, None, Some("t2"), None)))
rdd.saveToCassandra("test", "t3")
如果我们使用cqlsh
查看数据,我们将看到以下内容:
cqlsh:test> SELECT * from test.t3;
id | t1 | t2 | t3
----+------+----+------
1 | null | t2 | null
(1 rows)
在做完nodetool flush
之后,我们可以研究一下SSTables。这就是我们将在这里看到的:
>sstabledump mc-1-big-Data.db
[
{
"partition" : {
"key" : [ "1" ],
"position" : 0
},
"rows" : [
{
"type" : "row",
"position" : 30,
"liveness_info" : { "tstamp" : "2018-11-06T07:53:38.418171Z" },
"cells" : [
{ "name" : "t1", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
},
{ "name" : "t2", "value" : "t2" },
{ "name" : "t3", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
}
]
}
]
}
]
您可以看到,对于为nulls的列t1
和t3
,有一个字段deletion_info
。
现在,让我们使用TRUNCATE test.t3
删除数据,并在ignoreNulls
设置为true的情况下再次启动spark-shell:
dse spark --conf spark.cassandra.output.ignoreNulls=true
在执行相同的Spark代码之后,我们将在cqlsh
中看到相同的结果
cqlsh:test> SELECT * from test.t3;
id | t1 | t2 | t3
----+------+----+------
1 | null | t2 | null
但是在执行flush之后,sstabledump
显示了完全不同的画面:
>sstabledump mc-3-big-Data.db
[
{
"partition" : {
"key" : [ "1" ],
"position" : 0
},
"rows" : [
{
"type" : "row",
"position" : 27,
"liveness_info" : { "tstamp" : "2018-11-06T07:56:27.035600Z" },
"cells" : [
{ "name" : "t2", "value" : "t2" }
]
}
]
}
]
如您所见,我们只有t2
列的数据,没有提到t3
和t1
列为nulls。
https://stackoverflow.com/questions/53162033
复制相似问题