Hi, this looks like the flink-connector-kafka jar is not available where the job is running? Did you put it in the library folder of flink on all the machines or did you submit it with the job?
On Thu, Jul 16, 2015, 21:05 Wendong <wendong....@gmail.com> wrote: > Hi Gyula, > > Cool. I removed .print and the error was gone. > > However, env.execute failed with errors: > > ......... > Caused by: java.lang.Exception: Call to registerInputOutput() of invokable > failed > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) > ....... > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at > > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) > ....... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.api.KafkaSink > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > In the following code: > > val stream = env > .addSource(new KafkaSource[String]("localhost:2181", "test", new > SimpleStringSchema)) > .addSink(new KafkaSink[String]("localhost:2181", "test", new > SimpleStringSchema)) > > Anything wrong? I already did "import > org.apache.flink.streaming.connectors.kafka.api._". Class > SimpleStringSchema > was modified (see previous post). > > Thanks, > > Wendong > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2112.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >