Hi, why can't I register the stream as a table and get a MalformedClassName exception?
val serializer = new JSONKeyValueDeserializationSchema(false) val stream = senv.addSource( new FlinkKafkaConsumer( "tweets-raw-json", serializer, properties ).setStartFromEarliest() // TODO experiment with different start values ) case class Foo(lang: String, count: Int) val r = stream .map(e => { Foo(e.get("value").get("lang").asText(), 1) }) .keyBy(_.lang) .timeWindow(Time.seconds(10)) .sum("count") r.print() stenv.registerDataStream("tweets_json", r) Best, Georg