Hi Aljoscha You are the best. Thank you very much. Right now, It is working now.
Best regards Hawin On Fri, Jun 26, 2015 at 12:28 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > could you please try replacing JavaDefaultStringSchema() with > SimpleStringSchema() in your first example. The one where you get this > exception: > org.apache.commons.lang3.SerializationException: > java.io.StreamCorruptedException: invalid stream header: 68617769 > > Cheers, > Aljoscha > > On Fri, 26 Jun 2015 at 08:21 Hawin Jiang <hawin.ji...@gmail.com> wrote: > >> Dear Marton >> >> >> Here are some errors when I run KafkaProducerExample.java from Eclipse. >> >> kafka.common.KafkaException: fetching topic metadata for topics >> [Set(flink-kafka-topic)] from broker >> [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed >> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >> at >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >> at >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >> at kafka.utils.Utils$.swallow(Utils.scala:172) >> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >> at kafka.utils.Utils$.swallowError(Utils.scala:45) >> at >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >> at kafka.producer.Producer.send(Producer.scala:77) >> at kafka.javaapi.producer.Producer.send(Producer.scala:33) >> at >> org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.EOFException: Received -1 when reading from channel, >> socket has likely been closed. >> at kafka.utils.Utils$.read(Utils.scala:381) >> at >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >> at kafka.network.Receive$class.readCompletely(Transmission.scala:56) >> at >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) >> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) >> at >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >> >> On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <hawin.ji...@gmail.com> >> wrote: >> >>> Dear Marton >>> >>> I have upgraded my Flink to 0.9.0. But I could not consume a data from >>> Kafka by Flink. >>> I have fully followed your example. >>> Please help me. >>> Thanks. >>> >>> >>> Here is my code >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4); >>> >>> DataStream<String> kafkaStream = env >>> .addSource(new KafkaSource<String>(host + ":" + port, topic, new >>> JavaDefaultStringSchema())); >>> kafkaStream.print(); >>> >>> env.execute(); >>> >>> >>> Here are some errors: >>> >>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: >>> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], >>> Shutting down >>> 15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket >>> error: java.nio.channels.ClosedByInterruptException >>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: >>> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], >>> Stopped >>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: >>> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], >>> Shutdown completed >>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: >>> [ConsumerFetcherManager-1435298169147] All connections stopped >>> 15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event >>> thread. >>> 15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a >>> closed >>> 15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: >>> [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown >>> completed in 40 ms >>> 15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream >>> Sink (3/4) failed >>> org.apache.commons.lang3.SerializationException: >>> java.io.StreamCorruptedException: invalid stream header: 68617769 >>> at >>> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232) >>> at >>> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) >>> at >>> org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40) >>> at >>> org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24) >>> at >>> org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.io.StreamCorruptedException: invalid stream header: >>> 68617769 >>> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) >>> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) >>> at >>> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222) >>> ... 8 more >>> >>> On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi < >>> balassi.mar...@gmail.com> wrote: >>> >>>> Dear Hawin, >>>> >>>> Sorry, I ahve managed to link to a pom that has been changed in the >>>> meantime. But we have added a section to our doc clarifying your question. >>>> [1] Since then Stephan has proposed an even nicer solution that did not >>>> make it into the doc yet, namely if you start from our quickstart pom and >>>> add your dependencies to that simply executing 'mvn package -Pbuild-jar' >>>> you get a jar with all your the code that is needed to run it on the >>>> cluster, but not more. See [3] for more on the quickstart. >>>> >>>> [1] >>>> http://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution >>>> [2] >>>> https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml >>>> [3] >>>> http://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html >>>> >>>> On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar < >>>> ashutosh.disc...@gmail.com> wrote: >>>> >>>>> I use following dependencies and it works fine . >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-java</artifactId> >>>>> <version>0.9-SNAPSHOT</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-clients</artifactId> >>>>> <version>0.9-SNAPSHOT</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-core</artifactId> >>>>> <version>0.9-SNAPSHOT</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-connector-kafka</artifactId> >>>>> <version>0.9-SNAPSHOT</version> >>>>> </dependency> >>>>> <dependency> >>>>> >>>>> On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <hawin.ji...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Marton >>>>>> >>>>>> I have to add whole pom.xml file or just only plugin as below. >>>>>> I saw L286 to L296 are not correct information in pom.xml. >>>>>> Thanks. >>>>>> >>>>>> >>>>>> >>>>>> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId >>>>>> >maven-assembly-plugin</artifactId> <version>2.4</version> < >>>>>> configuration> <descriptorRefs> <descriptorRef >>>>>> >jar-with-dependencies</descriptorRef> </descriptorRefs> </ >>>>>> configuration> </plugin> >>>>>> >>>>>> On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi < >>>>>> balassi.mar...@gmail.com> wrote: >>>>>> >>>>>>> As for locally I meant the machine that you use for development to >>>>>>> see whether this works without parallelism. :-) No need to install >>>>>>> stuff on >>>>>>> your Namenode of course. >>>>>>> Installing Kafka on a machine and having the Kafka Java dependencies >>>>>>> available for Flink are two very different things. Try adding the >>>>>>> following >>>>>>> [1] to your maven pom. Then execute 'mvn assembly:assembly', this will >>>>>>> produce a fat jar suffiexed jar-with-dependencies.jar. You should be >>>>>>> able >>>>>>> to run the example form that. >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/mbalassi/flink-dataflow/blob/master/pom.xml#L286-296 >>>>>>> >>>>>>> On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <hawin.ji...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Dear Marton >>>>>>>> >>>>>>>> What do you meaning for locally Eclipse with 'Run'. >>>>>>>> Do you want to me to run it on Namenode? >>>>>>>> But my namenode didn't install Kafka. I only installed Kafka on my >>>>>>>> data node servers. >>>>>>>> Do I need to install or copy Kafka jar on Namenode? Actually, I >>>>>>>> don't want to install everything on Name node server. >>>>>>>> Please advise me. >>>>>>>> Thanks. >>>>>>>> >>>>>>>> >>>>>>>> My Flink and Hadoop cluster info as below. >>>>>>>> >>>>>>>> Flink on NameNode >>>>>>>> Kafka,Zookeeper and FLink slave1 on Datanode1 >>>>>>>> Kafka,Zookeeper ,and Flink slave2 on Datanode2 >>>>>>>> Kafka, Zookeeper and Flink slave3 on Datanode3 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi < >>>>>>>> balassi.mar...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Dear Hawin, >>>>>>>>> >>>>>>>>> No problem, I am gald that you are giving our Kafka connector a >>>>>>>>> try. :) >>>>>>>>> The dependencies listed look good. Can you run the example locally >>>>>>>>> from Eclipse with 'Run'? I suspect that maybe your Flink cluster does >>>>>>>>> not >>>>>>>>> have the access to the kafka dependency then. >>>>>>>>> >>>>>>>>> As a quick test you could copy the kafka jars to the lib folder of >>>>>>>>> your Flink distribution on all the machines in your cluster. >>>>>>>>> Everything >>>>>>>>> that is there goes to the classpath of Flink. Another workaround with >>>>>>>>> be to >>>>>>>>> build a fat jar for your project containing all the dependencies with >>>>>>>>> 'mvn >>>>>>>>> assembly:assembly'. Neither of these are beautiful but would help >>>>>>>>> tracking >>>>>>>>> down the root cause. >>>>>>>>> >>>>>>>>> On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang < >>>>>>>>> hawin.ji...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Dear Marton >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks for supporting again. >>>>>>>>>> >>>>>>>>>> I am running these examples at the same project and I am using >>>>>>>>>> Eclipse IDE to submit it to my Flink cluster. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Here is my dependencies >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ****************************************************************************** >>>>>>>>>> >>>>>>>>>> <dependencies> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>*junit*</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*junit*</artifactId> >>>>>>>>>> >>>>>>>>>> <version>4.12</version> >>>>>>>>>> >>>>>>>>>> <scope>test</scope> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*flink*-java</artifactId> >>>>>>>>>> >>>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*flink*-clients</artifactId> >>>>>>>>>> >>>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*flink*-streaming-connectors</artifactId> >>>>>>>>>> >>>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*flink*-streaming-core</artifactId> >>>>>>>>>> >>>>>>>>>> <version>0.9.0-milestone-1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>kafka_2.10</artifactId> >>>>>>>>>> >>>>>>>>>> <version>0.8.2.1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*kafka*-clients</artifactId> >>>>>>>>>> >>>>>>>>>> <version>0.8.2.1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*hadoop*-*hdfs*</artifactId> >>>>>>>>>> >>>>>>>>>> <version>2.6.0</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*hadoop*-*auth*</artifactId> >>>>>>>>>> >>>>>>>>>> <version>2.6.0</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*hadoop*-common</artifactId> >>>>>>>>>> >>>>>>>>>> <version>2.6.0</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> >>>>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>*hadoop*-core</artifactId> >>>>>>>>>> >>>>>>>>>> <version>1.2.1</version> >>>>>>>>>> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> </dependencies> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ***************************************************************************************** >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best regards >>>>>>>>>> >>>>>>>>>> Email: hawin.ji...@gmail.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *From:* Márton Balassi [mailto:balassi.mar...@gmail.com] >>>>>>>>>> *Sent:* Thursday, June 11, 2015 12:58 AM >>>>>>>>>> *To:* user@flink.apache.org >>>>>>>>>> *Subject:* Re: Kafka0.8.2.1 + Flink0.9.0 issue >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Dear Hawin, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This looks like a dependency issue, the java compiler does not >>>>>>>>>> find the kafka dependency. How are you trying to run this example? >>>>>>>>>> Is it >>>>>>>>>> from an IDE or submitting it to a flink cluster with bin/flink run? >>>>>>>>>> How do >>>>>>>>>> you define your dependencies, do you use maven or sbt for instance? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Marton >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang < >>>>>>>>>> hawin.ji...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>