且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何迭代记录Spark Scala?

更新时间:2023-11-18 23:26:52

以下是在类似情况下使用spark-shell的会话中的日志.

Here is a log from a session using spark-shell with a similar scenario.

给予

scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]

您的问题看起来像

scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]

因此,map仅返回另一个RDD(该函数不会立即应用,当您真正遍历结果时将延迟"应用该函数).

so map just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).

因此,当您实现(使用collect())时,您将获得一个正常"集合:

So when you materialize (using collect()) you get a "normal" collection:

scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])

可以在其上map.请注意,在这种情况下,传递给map(println)的闭包中有副作用,println的结果为Unit):

over which which you can map. Note that in this case you have a side-effect in the closure passed to map (the println), the result of println is Unit):

scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())

如果在末尾应用collect,则结果相同:

Same result if collect is applied at the end:

scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())

但是,如果您只想打印行,则可以将其简化为使用foreach:

But if you just want to print the rows, you can simplify it to using foreach:

scala> persons.foreach(t => println(t))
[Justin,19]

正如@RohanAletty在评论中指出的那样,这适用于本地Spark作业.如果作业在群集中运行,则还需要collect:

As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect is required as well:

persons.collect().foreach(t => println(t))


注释


Notes

  • Iterator类中可以观察到相同的行为.
  • 以上会话的输出已重新排序
  • The same behaviour can be observed in the Iterator class.
  • The output of the session above has been reordered

更新

关于过滤:如果在collect之后应用可以在之前应用的过滤器,则collect的位置为错误".

As for filtering: The location of collect is "bad", if you apply filters after collect which can be applied before.

例如,这些表达式给出相同的结果:

For example these expressions give the same result:

scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]

scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]

,但第二种情况更糟,因为该过滤器可能早于collect就应用了.

but the 2nd case is worse, because that filter could have been applied before collect.

这同样适用于任何类型的聚合.

The same applies to any type of aggregation as well.