Dear Marton I have upgraded my Flink to 0.9.0. But I could not consume a data from Kafka by Flink. I have fully followed your example. Please help me. Thanks.
Here is my code StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4); DataStream<String> kafkaStream = env .addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())); kafkaStream.print(); env.execute(); Here are some errors: 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutting down 15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Stopped 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1], Shutdown completed 15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1435298169147] All connections stopped 15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. 15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a closed 15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector: [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown completed in 40 ms 15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream Sink (3/4) failed org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 68617769 at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40) at org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24) at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222) ... 8 more On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Dear Hawin, > > Sorry, I ahve managed to link to a pom that has been changed in the > meantime. But we have added a section to our doc clarifying your question. > [1] Since then Stephan has proposed an even nicer solution that did not > make it into the doc yet, namely if you start from our quickstart pom and > add your dependencies to that simply executing 'mvn package -Pbuild-jar' > you get a jar with all your the code that is needed to run it on the > cluster, but not more. See [3] for more on the quickstart. > > [1] > http://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution > [2] > https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml > [3] > http://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html > > On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar < > ashutosh.disc...@gmail.com> wrote: > >> I use following dependencies and it works fine . >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>0.9-SNAPSHOT</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients</artifactId> >> <version>0.9-SNAPSHOT</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-core</artifactId> >> <version>0.9-SNAPSHOT</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka</artifactId> >> <version>0.9-SNAPSHOT</version> >> </dependency> >> <dependency> >> >> On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <hawin.ji...@gmail.com> >> wrote: >> >>> Hi Marton >>> >>> I have to add whole pom.xml file or just only plugin as below. >>> I saw L286 to L296 are not correct information in pom.xml. >>> Thanks. >>> >>> >>> >>> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId >>> >maven-assembly-plugin</artifactId> <version>2.4</version> < >>> configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</ >>> descriptorRef> </descriptorRefs> </configuration> </plugin> >>> >>> On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi < >>> balassi.mar...@gmail.com> wrote: >>> >>>> As for locally I meant the machine that you use for development to see >>>> whether this works without parallelism. :-) No need to install stuff on >>>> your Namenode of course. >>>> Installing Kafka on a machine and having the Kafka Java dependencies >>>> available for Flink are two very different things. Try adding the following >>>> [1] to your maven pom. Then execute 'mvn assembly:assembly', this will >>>> produce a fat jar suffiexed jar-with-dependencies.jar. You should be able >>>> to run the example form that. >>>> >>>> [1] >>>> https://github.com/mbalassi/flink-dataflow/blob/master/pom.xml#L286-296 >>>> >>>> On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <hawin.ji...@gmail.com> >>>> wrote: >>>> >>>>> Dear Marton >>>>> >>>>> What do you meaning for locally Eclipse with 'Run'. >>>>> Do you want to me to run it on Namenode? >>>>> But my namenode didn't install Kafka. I only installed Kafka on my >>>>> data node servers. >>>>> Do I need to install or copy Kafka jar on Namenode? Actually, I don't >>>>> want to install everything on Name node server. >>>>> Please advise me. >>>>> Thanks. >>>>> >>>>> >>>>> My Flink and Hadoop cluster info as below. >>>>> >>>>> Flink on NameNode >>>>> Kafka,Zookeeper and FLink slave1 on Datanode1 >>>>> Kafka,Zookeeper ,and Flink slave2 on Datanode2 >>>>> Kafka, Zookeeper and Flink slave3 on Datanode3 >>>>> >>>>> >>>>> >>>>> On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi < >>>>> balassi.mar...@gmail.com> wrote: >>>>> >>>>>> Dear Hawin, >>>>>> >>>>>> No problem, I am gald that you are giving our Kafka connector a try. >>>>>> :) >>>>>> The dependencies listed look good. Can you run the example locally >>>>>> from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not >>>>>> have the access to the kafka dependency then. >>>>>> >>>>>> As a quick test you could copy the kafka jars to the lib folder of >>>>>> your Flink distribution on all the machines in your cluster. Everything >>>>>> that is there goes to the classpath of Flink. Another workaround with be >>>>>> to >>>>>> build a fat jar for your project containing all the dependencies with >>>>>> 'mvn >>>>>> assembly:assembly'. Neither of these are beautiful but would help >>>>>> tracking >>>>>> down the root cause. >>>>>> >>>>>> On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <hawin.ji...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Dear Marton >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks for supporting again. >>>>>>> >>>>>>> I am running these examples at the same project and I am using >>>>>>> Eclipse IDE to submit it to my Flink cluster. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Here is my dependencies >>>>>>> >>>>>>> >>>>>>> ****************************************************************************** >>>>>>> >>>>>>> <dependencies> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>*junit*</groupId> >>>>>>> >>>>>>> <artifactId>*junit*</artifactId> >>>>>>> >>>>>>> <version>4.12</version> >>>>>>> >>>>>>> <scope>test</scope> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> >>>>>>> <artifactId>*flink*-java</artifactId> >>>>>>> >>>>>>> <version>0.9.0-milestone-1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> >>>>>>> <artifactId>*flink*-clients</artifactId> >>>>>>> >>>>>>> <version>0.9.0-milestone-1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> >>>>>>> <artifactId>*flink*-streaming-connectors</artifactId> >>>>>>> >>>>>>> <version>0.9.0-milestone-1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> >>>>>>> <artifactId>*flink*-streaming-core</artifactId> >>>>>>> >>>>>>> <version>0.9.0-milestone-1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>> >>>>>>> <artifactId>kafka_2.10</artifactId> >>>>>>> >>>>>>> <version>0.8.2.1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>> >>>>>>> <artifactId>*kafka*-clients</artifactId> >>>>>>> >>>>>>> <version>0.8.2.1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>> >>>>>>> <artifactId>*hadoop*-*hdfs*</artifactId> >>>>>>> >>>>>>> <version>2.6.0</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>> >>>>>>> <artifactId>*hadoop*-*auth*</artifactId> >>>>>>> >>>>>>> <version>2.6.0</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>> >>>>>>> <artifactId>*hadoop*-common</artifactId> >>>>>>> >>>>>>> <version>2.6.0</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <dependency> >>>>>>> >>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>> >>>>>>> <artifactId>*hadoop*-core</artifactId> >>>>>>> >>>>>>> <version>1.2.1</version> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> </dependencies> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ***************************************************************************************** >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best regards >>>>>>> >>>>>>> Email: hawin.ji...@gmail.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> *From:* Márton Balassi [mailto:balassi.mar...@gmail.com] >>>>>>> *Sent:* Thursday, June 11, 2015 12:58 AM >>>>>>> *To:* user@flink.apache.org >>>>>>> *Subject:* Re: Kafka0.8.2.1 + Flink0.9.0 issue >>>>>>> >>>>>>> >>>>>>> >>>>>>> Dear Hawin, >>>>>>> >>>>>>> >>>>>>> >>>>>>> This looks like a dependency issue, the java compiler does not find >>>>>>> the kafka dependency. How are you trying to run this example? Is it >>>>>>> from an >>>>>>> IDE or submitting it to a flink cluster with bin/flink run? How do you >>>>>>> define your dependencies, do you use maven or sbt for instance? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> >>>>>>> >>>>>>> Marton >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <hawin.ji...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >