且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用Scala中的时间戳/日期列表获取事件发生的每小时平均值

更新时间:2023-01-29 16:58:32

我尝试使用以下方法解决问题-

I tried to solve using below approach-

请注意,该代码在IST(GMT + 5.30)中运行,因此日期为 2018-04-07 07:07:17和 2018-04-07 07:32:27 将在不同的时间(6:30-7:30中的第一和7:30-8:30中的第二)进行考虑

Please note that the code is running in IST (GMT + 5.30), Therefore the dates 2018-04-07 07:07:17 and 2018-04-07 07:32:27 will be considered in different hours (1st in 6:30 - 7:30 and 2nd in 7:30 - 8:30)

代码

  1. Read the data

val spark = sqlContext.sparkSession
    val implicits = spark.implicits
    import implicits._
    val data =
      """
        |2018-04-07 07:07:17
        |2018-04-07 07:32:27
        |2018-04-07 08:36:44
        |2018-04-07 08:38:00
        |2018-04-07 08:39:29
        |2018-04-08 01:43:08
        |2018-04-08 01:43:55
        |2018-04-09 07:52:31
        |2018-04-09 07:52:42
        |2019-01-24 11:52:31
        |2019-01-24 12:52:42
        |2019-01-25 12:52:42
      """.stripMargin
    val df = spark.read
      .schema(StructType(Array(StructField("date_time", DataTypes.TimestampType))))
      .csv(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

结果-

+-------------------+
|date_time          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+
root
 |-- date_time: timestamp (nullable = true)

  1. Bucketize the data and find the count for each hour

 val hour = 60 * 60
    // convert the time into unix epoch
    val processedDF = df.withColumn("unix_epoch", unix_timestamp(col("date_time")))
      .withColumn("hour_bucket", floor(col("unix_epoch")/hour))
      .groupBy("hour_bucket")
      .count()

    processedDF.show(false)

结果-

+-----------+-----+
|hour_bucket|count|
+-----------+-----+
|423073     |1    |
|423074     |1    |
|423075     |3    |
|423092     |2    |
|423122     |2    |
|430087     |1    |
|430086     |1    |
|430111     |1    |
+-----------+-----+

  1. find hourly average

  // average count
    processedDF.agg(avg("count")).show(false)

结果-

+----------+
|avg(count)|
+----------+
|1.5       |
+----------+

希望这会有所帮助!