Hi, could you please try replacing JavaDefaultStringSchema() with SimpleStringSchema() in your first example. The one where you get this exception: org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769
Cheers, Aljoscha On Fri, 26 Jun 2015 at 08:21 Hawin Jiang <hawin.ji...@gmail.com> wrote: > Dear Marton > > > Here are some errors when I run KafkaProducerExample.java from Eclipse. > > kafka.common.KafkaException: fetching topic metadata for topics > [Set(flink-kafka-topic)] from broker > [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at kafka.producer.Producer.send(Producer.scala:77) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > at > org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException: Received -1 when reading from channel, > socket has likely been closed. > at kafka.utils.Utils$.read(Utils.scala:381) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <hawin.ji...@gmail.com> > wrote: > >> Dear Marton >> >> I have upgraded my Flink to 0.9.0. But I could not consume a data from >> Kafka by Flink. >> I have fully followed your example. >> Please help me. >> Thanks. >> >> >> Here is my code >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4); >> >> DataStream<String> kafkaStream = env >> .addSource(new KafkaSource<String>(host + ":" + port, topic, new >> JavaDefaultStringSchema())); >> kafkaStream.print(); >> >> env.execute(); >> >> >> Here are some errors: >> >> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: >> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], >> Shutting down >> 15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket >> error: java.nio.channels.ClosedByInterruptException >> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: >> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], >> Stopped >> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: >> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], >> Shutdown completed >> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: >> [ConsumerFetcherManager-1435298169147] All connections stopped >> 15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event >> thread. >> 15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a >> closed >> 15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: >> [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown >> completed in 40 ms >> 15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream >> Sink (3/4) failed >> org.apache.commons.lang3.SerializationException: >> java.io.StreamCorruptedException: invalid stream header: 68617769 >> at >> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232) >> at >> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) >> at >> org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40) >> at >> org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24) >> at >> org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.StreamCorruptedException: invalid stream header: >> 68617769 >> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) >> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) >> at >> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222) >> ... 8 more >> >> On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <balassi.mar...@gmail.com >> > wrote: >> >>> Dear Hawin, >>> >>> Sorry, I ahve managed to link to a pom that has been changed in the >>> meantime. But we have added a section to our doc clarifying your question. >>> [1] Since then Stephan has proposed an even nicer solution that did not >>> make it into the doc yet, namely if you start from our quickstart pom and >>> add your dependencies to that simply executing 'mvn package -Pbuild-jar' >>> you get a jar with all your the code that is needed to run it on the >>> cluster, but not more. See [3] for more on the quickstart. >>> >>> [1] >>> http://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution >>> [2] >>> https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml >>> [3] >>> http://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html >>> >>> On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar < >>> ashutosh.disc...@gmail.com> wrote: >>> >>>> I use following dependencies and it works fine . >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-java</artifactId> >>>> <version>0.9-SNAPSHOT</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-clients</artifactId> >>>> <version>0.9-SNAPSHOT</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-core</artifactId> >>>> <version>0.9-SNAPSHOT</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-kafka</artifactId> >>>> <version>0.9-SNAPSHOT</version> >>>> </dependency> >>>> <dependency> >>>> >>>> On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <hawin.ji...@gmail.com> >>>> wrote: >>>> >>>>> Hi Marton >>>>> >>>>> I have to add whole pom.xml file or just only plugin as below. >>>>> I saw L286 to L296 are not correct information in pom.xml. >>>>> Thanks. >>>>> >>>>> >>>>> >>>>> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId >>>>> >maven-assembly-plugin</artifactId> <version>2.4</version> < >>>>> configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</ >>>>> descriptorRef> </descriptorRefs> </configuration> </plugin> >>>>> >>>>> On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi < >>>>> balassi.mar...@gmail.com> wrote: >>>>> >>>>>> As for locally I meant the machine that you use for development to >>>>>> see whether this works without parallelism. :-) No need to install stuff >>>>>> on >>>>>> your Namenode of course. >>>>>> Installing Kafka on a machine and having the Kafka Java dependencies >>>>>> available for Flink are two very different things. Try adding the >>>>>> following >>>>>> [1] to your maven pom. Then execute 'mvn assembly:assembly', this will >>>>>> produce a fat jar suffiexed jar-with-dependencies.jar. You should be able >>>>>> to run the example form that. >>>>>> >>>>>> [1] >>>>>> https://github.com/mbalassi/flink-dataflow/blob/master/pom.xml#L286-296 >>>>>> >>>>>> On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <hawin.ji...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Dear Marton >>>>>>> >>>>>>> What do you meaning for locally Eclipse with 'Run'. >>>>>>> Do you want to me to run it on Namenode? >>>>>>> But my namenode didn't install Kafka. I only installed Kafka on my >>>>>>> data node servers. >>>>>>> Do I need to install or copy Kafka jar on Namenode? Actually, I >>>>>>> don't want to install everything on Name node server. >>>>>>> Please advise me. >>>>>>> Thanks. >>>>>>> >>>>>>> >>>>>>> My Flink and Hadoop cluster info as below. >>>>>>> >>>>>>> Flink on NameNode >>>>>>> Kafka,Zookeeper and FLink slave1 on Datanode1 >>>>>>> Kafka,Zookeeper ,and Flink slave2 on Datanode2 >>>>>>> Kafka, Zookeeper and Flink slave3 on Datanode3 >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi < >>>>>>> balassi.mar...@gmail.com> wrote: >>>>>>> >>>>>>>> Dear Hawin, >>>>>>>> >>>>>>>> No problem, I am gald that you are giving our Kafka connector a >>>>>>>> try. :) >>>>>>>> The dependencies listed look good. Can you run the example locally >>>>>>>> from Eclipse with 'Run'? I suspect that maybe your Flink cluster does >>>>>>>> not >>>>>>>> have the access to the kafka dependency then. >>>>>>>> >>>>>>>> As a quick test you could copy the kafka jars to the lib folder of >>>>>>>> your Flink distribution on all the machines in your cluster. Everything >>>>>>>> that is there goes to the classpath of Flink. Another workaround with >>>>>>>> be to >>>>>>>> build a fat jar for your project containing all the dependencies with >>>>>>>> 'mvn >>>>>>>> assembly:assembly'. Neither of these are beautiful but would help >>>>>>>> tracking >>>>>>>> down the root cause. >>>>>>>> >>>>>>>> On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang < >>>>>>>> hawin.ji...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Dear Marton >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks for supporting again. >>>>>>>>> >>>>>>>>> I am running these examples at the same project and I am using >>>>>>>>> Eclipse IDE to submit it to my Flink cluster. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Here is my dependencies >>>>>>>>> >>>>>>>>> >>>>>>>>> ****************************************************************************** >>>>>>>>> >>>>>>>>> <dependencies> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>*junit*</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*junit*</artifactId> >>>>>>>>> >>>>>>>>> <version>4.12</version> >>>>>>>>> >>>>>>>>> <scope>test</scope> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*flink*-java</artifactId> >>>>>>>>> >>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*flink*-clients</artifactId> >>>>>>>>> >>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*flink*-streaming-connectors</artifactId> >>>>>>>>> >>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*flink*-streaming-core</artifactId> >>>>>>>>> >>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>>>> >>>>>>>>> <artifactId>kafka_2.10</artifactId> >>>>>>>>> >>>>>>>>> <version>0.8.2.1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*kafka*-clients</artifactId> >>>>>>>>> >>>>>>>>> <version>0.8.2.1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*hadoop*-*hdfs*</artifactId> >>>>>>>>> >>>>>>>>> <version>2.6.0</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*hadoop*-*auth*</artifactId> >>>>>>>>> >>>>>>>>> <version>2.6.0</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*hadoop*-common</artifactId> >>>>>>>>> >>>>>>>>> <version>2.6.0</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> >>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>> >>>>>>>>> <artifactId>*hadoop*-core</artifactId> >>>>>>>>> >>>>>>>>> <version>1.2.1</version> >>>>>>>>> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> </dependencies> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ***************************************************************************************** >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Best regards >>>>>>>>> >>>>>>>>> Email: hawin.ji...@gmail.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *From:* Márton Balassi [mailto:balassi.mar...@gmail.com] >>>>>>>>> *Sent:* Thursday, June 11, 2015 12:58 AM >>>>>>>>> *To:* user@flink.apache.org >>>>>>>>> *Subject:* Re: Kafka0.8.2.1 + Flink0.9.0 issue >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Dear Hawin, >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> This looks like a dependency issue, the java compiler does not >>>>>>>>> find the kafka dependency. How are you trying to run this example? Is >>>>>>>>> it >>>>>>>>> from an IDE or submitting it to a flink cluster with bin/flink run? >>>>>>>>> How do >>>>>>>>> you define your dependencies, do you use maven or sbt for instance? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Marton >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang < >>>>>>>>> hawin.ji...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >