Hi Gyula, Cool. I removed .print and the error was gone.
However, env.execute failed with errors: ......... Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) ....... Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) ....... Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) In the following code: val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema)) Anything wrong? I already did "import org.apache.flink.streaming.connectors.kafka.api._". Class SimpleStringSchema was modified (see previous post). Thanks, Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2112.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.