Hi,
could you please post the stacktrace with the exception and also let us
know which Flink version you're using?
I have tried the following code and it works on
master/flink-1.11/flink-1.10:
case class Foo(lang: String, count: Int)
def main(args: Array[String]): Unit = {
val senv
Hi,
why can't I register the stream as a table and get a MalformedClassName
exception?
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
"tweets-raw-json",
serializer,
properties
).setStartFromEarliest() //