Here is my simple program to use Kafka: import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ import org.apache.flink.api.common.typeinfo._
object TestKafka { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new JavaDefaultStringSchema)) env.execute("Test Kafka") } } in build.sbt: libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.0", "org.apache.flink" % "flink-clients" % "0.9.0") libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1" libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") libraryDependencies += "com.101tec" % "zkclient" % "0.5" I'm using "sbt assembly" to build a fat jar, so the target jar file is supposed to contain everything. However, there are errors when running the target jar file: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.initializeConnection(KafkaSource.java:175) at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.open(KafkaSource.java:207) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openOperator(StreamTask.java:158) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:52) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.serialize.ZkSerializer at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ......... I already put zkclient-0.5.jar under <flink_root_dir>/lib. Anyone can shed some light? Thanks for help! Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-cannot-find-org-I0Itec-zkclient-serialize-ZkSerializer-tp2199.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.