且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

pyspark 我们如何检查列值是否包含在列表中

更新时间:2021-09-29 07:24:51

我看到了一些不使用 udf的方法/code>.

I see some ways to do this without using a udf.

您可以使用 pyspark.sql.functions.regexp_extract,利用如果没有匹配则返回空字符串的事实.

You could use a list comprehension with pyspark.sql.functions.regexp_extract, exploiting the fact that an empty string is returned if there is no match.

尝试提取列表 l 中的所有值并连接结果.如果生成的连接字符串是空字符串,则意味着没有匹配的值.

Try to extract all of the values in the list l and concatenate the results. If the resulting concatenated string is an empty string, that means none of the values matched.

例如:

from pyspark.sql.functions import concat, regexp_extract

records = df.where(concat(*[regexp_extract("score", str(val), 0) for val in l]) != "")
records.show()
#+---+-----+
#| id|score|
#+---+-----+
#|  0|  100|
#|  0|    1|
#|  1|   10|
#|  3|   18|
#|  3|   18|
#|  3|   18|
#+---+-----+

如果你看一下执行计划,你会发现它很聪明地将 score 列隐式转换为 string:

If you take a look at the execution plan, you'll see that it's smart enough cast the score column to string implicitly:

records.explain()
#== Physical Plan ==
#*Filter NOT (concat(regexp_extract(cast(score#11L as string), 1, 0)) = )
#+- Scan ExistingRDD[id#10L,score#11L]

另一种方法是使用 pyspark.sql.Column.like (或类似的 rlike):

Another way is to use pyspark.sql.Column.like (or similarly with rlike):

from functools import reduce
from pyspark.sql.functions import col

records = df.where(
    reduce(
        lambda a, b: a|b, 
        map(
            lambda val: col("score").like(val.join(["%", "%"])), 
            map(str, l)
        )
    )
)

产生与上述相同的输出并具有以下执行计划:

Which produces the same output as above and has the following execution plan:

#== Physical Plan ==
#*Filter Contains(cast(score#11L as string), 1)
#+- Scan ExistingRDD[id#10L,score#11L]

如果你只想要不同的记录,你可以这样做:


If you wanted only distinct records, you can do:

records.distinct().show()
#+---+-----+
#| id|score|
#+---+-----+
#|  0|    1|
#|  0|  100|
#|  3|   18|
#|  1|   10|
#+---+-----+