更新时间:2023-11-18 23:26:52
以下是在类似情况下使用spark-shell
的会话中的日志.
Here is a log from a session using spark-shell
with a similar scenario.
给予
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
您的问题看起来像
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
因此,map
仅返回另一个RDD(该函数不会立即应用,当您真正遍历结果时将延迟"应用该函数).
so map
just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).
因此,当您实现(使用collect()
)时,您将获得一个正常"集合:
So when you materialize (using collect()
) you get a "normal" collection:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
可以在其上map
.请注意,在这种情况下,传递给map
(println
)的闭包中有副作用,println
的结果为Unit
):
over which which you can map
. Note that in this case you have a side-effect in the closure passed to map
(the println
), the result of println
is Unit
):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
如果在末尾应用collect
,则结果相同:
Same result if collect
is applied at the end:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
但是,如果您只想打印行,则可以将其简化为使用foreach
:
But if you just want to print the rows, you can simplify it to using foreach
:
scala> persons.foreach(t => println(t))
[Justin,19]
正如@RohanAletty在评论中指出的那样,这适用于本地Spark作业.如果作业在群集中运行,则还需要collect
:
As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect
is required as well:
persons.collect().foreach(t => println(t))
注释
Notes
Iterator
类中可以观察到相同的行为.Iterator
class.更新
关于过滤:如果在collect
之后应用可以在之前应用的过滤器,则collect
的位置为错误".
As for filtering: The location of collect
is "bad", if you apply filters after collect
which can be applied before.
例如,这些表达式给出相同的结果:
For example these expressions give the same result:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
,但第二种情况更糟,因为该过滤器可能早于collect
就应用了.
but the 2nd case is worse, because that filter could have been applied before collect
.
这同样适用于任何类型的聚合.
The same applies to any type of aggregation as well.