Here is my simple program to use Kafka:

import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,
ProducerRecord}
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.api.common.typeinfo._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new
SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new
JavaDefaultStringSchema))

    env.execute("Test Kafka")
  }
}

in build.sbt:

libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.0",
"org.apache.flink" % "flink-clients" % "0.9.0")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
libraryDependencies += "org.apache.flink" % "flink-streaming-scala" %
"0.9.0"
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" %
"0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "com.101tec" % "zkclient" % "0.5"

I'm using "sbt assembly" to build a fat jar, so the target jar file is
supposed to contain everything. However, there are errors when running the
target jar file:

java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer
        at
kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
        at
kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69)
        at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105)
        at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
        at
org.apache.flink.streaming.connectors.kafka.api.KafkaSource.initializeConnection(KafkaSource.java:175)
        at
org.apache.flink.streaming.connectors.kafka.api.KafkaSource.open(KafkaSource.java:207)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:56)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openOperator(StreamTask.java:158)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:52)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.I0Itec.zkclient.serialize.ZkSerializer
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
.........

I already put zkclient-0.5.jar under <flink_root_dir>/lib. Anyone can shed
some light?

Thanks for help!

Wendong



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-cannot-find-org-I0Itec-zkclient-serialize-ZkSerializer-tp2199.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to