且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在转换为DF时将Kafka中的时间戳添加到Spark Streaming

更新时间:2022-12-06 18:20:14

卡夫卡记录有多种属性。

参见https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

请注意,对于Kafka,有一种流和批处理方法。

示例:

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

val sparkSession = SparkSession.builder
  .master("local")
  .appName("example")
  .getOrCreate()

import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "AAA")     
  .option("startingOffsets", "earliest")
  .load()
  //.as[String]
  //
  //.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)")
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .writeStream
    .format("console")
    .option("truncate", "false")
    .outputMode(OutputMode.Append())
    .start().awaitTermination()

我的示例输出如下:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+-----------------------+
|key |value|timestamp              |
+----+-----+-----------------------+
|null|RRR  |2019-02-07 04:37:34.983|
|null|HHH  |2019-02-07 04:37:36.802|
|null|JJJ  |2019-02-07 04:37:39.1  |
+----+-----+-----------------------+

对于非结构化流,

您只需展开上面的语句:

stream.map { record => (record.timestamp(), record.key(), record.value()) }