且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

在所有列火花上应用功能

更新时间:2023-11-15 20:33:04

在pyspark中实现起来非常容易,但是在尝试将其重写为scala代码时遇到了麻烦,我希望您能以某种方式进行管理.

This is quite easy to implement in pyspark, but I run into touble trying to rewrite this to scala code... I hope you will manage it somehow.

from pyspark.sql.functions import *
df = spark.createDataFrame([(100, "4.5", "5.6")], ["new_time", "col1", "col2"])
columns = [col(c).cast('float') if c != 'new_time' else col(c) for c in df.columns]
aggs = [avg(c) for c in df.columns if c != 'new_time']
finalresult = df.select(columns).groupBy('new_time').agg(*aggs)
finalresult.explain()

*HashAggregate(keys=[new_time#0L], functions=[avg(cast(col1#14 as double)), avg(cast(col2#15 as double))])
+- Exchange hashpartitioning(new_time#0L, 200)
   +- *HashAggregate(keys=[new_time#0L], functions=[partial_avg(cast(col1#14 as double)), partial_avg(cast(col2#15 as double))])
      +- *Project [new_time#0L, cast(col1#1 as float) AS col1#14, cast(col2#2 as float) AS col2#15]
         +- Scan ExistingRDD[new_time#0L,col1#1,col2#2]