各位同事们,日安。
((col1 = valueA 3) or (col2 = ValueB 2)) - Common sql expression
((NOT col1 = valueA N) and (col3 = ValueC 2)) - It could be with all kind of logic operators。例如,我有一个源dataSet:
+-----------+----------+----------+
| Col1 | Col2 | Col3 |
+-----------+----------+----------+
| ValueA 1 | ValueB 2 | ValueC 3 |
| ValueA 1 | ValueB 3 | ValueC 4 |
+-----------+----------+----------+我需要得到下一个数据集:
+-----------+----------+----------+----------+
| Col1 | Col2 | Col3 | Profile1 |
+-----------+----------+----------+----------+
| ValueA 1 | ValueB 2 | ValueC 3 | 1 |
| ValueA 1 | ValueB 3 | ValueC 4 | 0 |
+-----------+----------+----------+----------+。
我知道如何使用join (通过sql_expr过滤源数据集、加入withColumn等等)。但我有大约100个个人资料,我不会做100个加入。我不是在寻找现成的解决方案,但一些如何使它有效的建议将是重点。我想我可以以某种方式创建配置文件限制集合(profile_id,sql_expression),并为每一行进行映射,创建一个列,其中包含正确的profile_ids,并最终实现平面映射。
UPDATE1:目前我使用这个解决方案,但是不能测试它,因为在本地它永远不会结束。
@Override
public <V extends SomeData, T extends ObjWithRestr> Dataset<Row> filterByMultipleRestrictionObjs(Dataset<V> source,
List<T> objsWithRestr,
Class<V> tClass) {
Dataset<Row> resultDataset = source.as(Encoders.bean(Row.class));
for (T objWithRestr : objsWithRestr) {
Profile profile = (Profile) objWithRestr;
String client_id = profile.getClient_id();
ProfileRestrictions profileRestrictions = gsonAdapter
.fromJson(new StringReader(objWithRestr.getRestrictions()), ProfileRestrictions.class);
String combinedFilter = getCombinedFilter(profileRestrictions.getDemoFilter(), profileRestrictions.getMediaFilter());
Dataset<Row> filteredDataset = resultDataset.filter(combinedFilter);
Dataset<Row> falseDataset = resultDataset.exceptAll(filteredDataset).withColumn(client_id, lit(0));
Dataset<Row> trueDataset = resultDataset.intersectAll(filteredDataset).withColumn(client_id, lit(1));
resultDataset = falseDataset.unionByName(trueDataset);
}
return resultDataset;
}发布于 2019-09-27 17:43:26
# With the help of below approach you can be able to solve the isseue i believe
Your filter condition values
filter_col1|filter_col2
valueA 3|ValueB 2
valueA 4|ValueB 3
valueA 5|ValueB 4
valueA 6|ValueB 5
//read them and conver them into a dataframe - filter_cond_df
//Create temp table on top of filter_cond_df
filter_cond_df.createOrReplaceTempView("filter_temp")
Your input Data:
+-----------+----------+----------+
| Col1 | Col2 | Col3 |
+-----------+----------+----------+
| ValueA 1 | ValueB 2 | ValueC 3 |
| ValueA 1 | ValueB 3 | ValueC 4 |
+-----------+----------+----------+
//consider this as input_df, create a temp table on top it
input_df.createOrReplaceTempView("input_temp")
//to get only the matching for your filter condition
val matching_df = spark.sql("""select * from input_temp where col1 in (select filtert_col1 from filter_temp) or col2 in (select filter_col2 from filter_temp)""")
//get the remaining or not matched from your input
val notmatching_df = input_df.except(matching_df)
//adding profile column with value 1 to matching_df
val result1 = matching_df.withColumn("profile"),lit(1))
//adding profile column with value 0 to notmatching_df
val result2 = notmatching_df.withColumn("profile",lit(0))
val final_result = result1.union(result2)
i hope this helps!https://stackoverflow.com/questions/58137003
复制相似问题