Hi,

could you please post the stacktrace with the exception and also let us know which Flink version you're using?

I have tried the following code and it works on master/flink-1.11/flink-1.10:

  case class Foo(lang: String, count: Int)
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val stenv = StreamTableEnvironment.create(senv)

    val source = senv.fromElements("Hello", "ciao")
    val mapped = source.map( e => {Foo(e, 13) } )
    stenv.registerDataStream("foo", mapped)

    senv.execute()
  }

It's not exactly your code but pretty similar and I use the same case class.

Best,
Aljoscha

On 09.07.20 22:44, Georg Heiler wrote:
Hi,

why can't I register the stream as a table and get a MalformedClassName
exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
     new FlinkKafkaConsumer(
       "tweets-raw-json",
       serializer,
       properties
     ).setStartFromEarliest() // TODO experiment with different start values
   )

case class Foo(lang: String, count: Int)
val r = stream
     .map(e => {
       Foo(e.get("value").get("lang").asText(), 1)
     })
     .keyBy(_.lang)
     .timeWindow(Time.seconds(10))
     .sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg


Reply via email to