更新时间:2022-12-06 18:20:14
卡夫卡记录有多种属性。
参见https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
请注意,对于Kafka,有一种流和批处理方法。
示例:
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "AAA")
.option("startingOffsets", "earliest")
.load()
//.as[String]
//
//.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)")
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.writeStream
.format("console")
.option("truncate", "false")
.outputMode(OutputMode.Append())
.start().awaitTermination()
我的示例输出如下:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+-----------------------+
|key |value|timestamp |
+----+-----+-----------------------+
|null|RRR |2019-02-07 04:37:34.983|
|null|HHH |2019-02-07 04:37:36.802|
|null|JJJ |2019-02-07 04:37:39.1 |
+----+-----+-----------------------+
对于非结构化流,
您只需展开上面的语句:
stream.map { record => (record.timestamp(), record.key(), record.value()) }