且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Kafka 流过滤问题

更新时间:2023-11-26 19:52:10

来自 http://docs.confluent.io/current/streams/faq.html#scala-compile-error-无类型参数-java-defined-trait-is-invariant-in-type-t

这个问题的根本原因是 Scala-Java 互操作性——Kafka Streams API 是用 Java 实现的,但您的应用程序是用 Scala 编写的.值得注意的是,这个问题是由 Java 和 Scala 的类型系统如何交互引起的.例如,Java 中的通用通配符通常会导致此类 Scala 问题.

要解决此问题,您需要在 Scala 应用程序中显式声明类型,以便代码编译.例如,您可能需要将将多个 DSL 操作链接到多个语句的单个语句分解,其中每个语句显式声明各自的返回类型.StreamToTableJoinScalaIntegrationTest 演示了如何显式声明返回变量的类型.

更新

Kafka 2.0(将于 6 月发布)包含一个适当的 Scala API,可以避免这些问题.比较 https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams

I'm trying to run a basic app from the following example:

https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala

However I'm getting an exception at this line:

// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())

Error:(33, 25) missing parameter type for expanded function ((x$1) => x$1.toUpperCase()) textLines.mapValues(_.toUpperCase())

Error I'm getting if I hover cursor over the code:

Type mismatch, expected: ValueMapper[_ >: String, _ <: NotInferedVR], actual: (Any) => Any Cannot resolve symbol toUpperCase

Contents of my sbt file:

name := "untitled1"

version := "0.1"

scalaVersion := "2.11.11"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"

I'm really not sure how to proceed with that as I'm quite new to Scala. I'd like to know what's the issue and how to fix it.

From http://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type-t

The root cause of this problem is Scala-Java interoperability – the Kafka Streams API is implemented in Java, but your application is written in Scala. Notably, this problem is caused by how the type systems of Java and Scala interact. Generic wildcards in Java, for example, are often causing such Scala issues.

To fix the problem you would need to declare types explicitly in your Scala application in order for the code to compile. For example, you may need to break a single statement that chains multiple DSL operations into multiple statements, where each statement explicitly declares the respective return types. The StreamToTableJoinScalaIntegrationTest demonstrates how the types of return variables are explicitly declared.

Update

Kafka 2.0 (will be released in June) contains a proper Scala API that avoid those issues. Compare https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams