且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

从查询表中使用withColumn动态添加新列

更新时间:2023-12-01 16:44:40

对于Java8,您可以使用

With Java8, you can use this Stream.reduce() overload:

final Dataset<Row> dataframe = ...;
final Map<String, String> substitutes = ...;

final Dataset<Row> afterSubstitutions = codeSubstitutes.entrySet().stream()
    .reduce(dataframe, (df, entry) ->
            df.withColumn(entry.getKey(), when(/* replace with col(entry.getValue()) when null */)),
            (left, right) -> { throw new IllegalStateException("Can't merge two dataframes. This stream should not be a parallel one!"); }
    );

合并器(最后一个参数)应该合并两个并行处理的数据帧(如果流是parallel()流),但是我们根本不允许这样做,因为我们仅在sequential()流.

The combiner (last argument) is supposed to merge two dataframes processed in parallel (if the stream was a parallel() stream), but we'll simply not allow that, as we're only invoking this logic on a sequential() stream.

更具可读性/可维护性的版本涉及将上述逻辑提取到专用方法中的额外步骤,例如:

A more readable/maintainable version involves an extra-step for extracting the above logic into dedicated methods, such as:

    // ...
    Dataset<Row> nullSafeDf = codeSubstitutes.entrySet().stream()
        .reduce(dataframe, this::replaceIfNull, this::throwingCombiner);
    // ...
}


private Dataset<Row> replaceIfNull(Dataset<Row> df, Map.Entry<String, String> substitution) {
    final String original = substitution.getKey();
    final String replacement = substitution.getValue();
    return df.withColumn(original, when(col(original).isNull(), col(replacement))
            .otherwise(col(original)));
}

private <X> X throwingCombiner(X left, X right) {
    throw new IllegalStateException("Combining not allowed");
}