Dear Marton

I have upgraded my Flink to 0.9.0.  But I could not consume a data from
Kafka by Flink.
I have fully followed your example.
Please help me.
Thanks.


Here is my code
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new
JavaDefaultStringSchema()));
kafkaStream.print();

env.execute();


Here are some errors:

15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1],
Shutting down
15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket
error: java.nio.channels.ClosedByInterruptException
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1],
Stopped
15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1],
Shutdown completed
15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1435298169147] All connections stopped
15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.
15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a
closed
15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector:
[flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown
completed in 40 ms
15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream
Sink (3/4) failed
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid stream header: 68617769
at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at
org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40)
at
org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24)
at
org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid stream header: 68617769
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
... 8 more

On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Dear Hawin,
>
> Sorry, I ahve managed to link to a pom that has been changed in the
> meantime. But we have added a section to our doc clarifying your question.
> [1] Since then Stephan has proposed an even nicer solution that did not
> make it into the doc yet, namely if you start from our quickstart pom and
> add your dependencies to that simply executing 'mvn package -Pbuild-jar'
> you get a jar with all your the code that is needed to run it on the
> cluster, but not more. See [3] for more on the quickstart.
>
> [1]
> http://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
> [2]
> https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
> [3]
> http://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html
>
> On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <
> ashutosh.disc...@gmail.com> wrote:
>
>> I use following dependencies and it works fine .
>>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-java</artifactId>
>> <version>0.9-SNAPSHOT</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-clients</artifactId>
>> <version>0.9-SNAPSHOT</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-core</artifactId>
>> <version>0.9-SNAPSHOT</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-kafka</artifactId>
>> <version>0.9-SNAPSHOT</version>
>> </dependency>
>> <dependency>
>>
>> On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <hawin.ji...@gmail.com>
>> wrote:
>>
>>> Hi  Marton
>>>
>>> I have to add whole pom.xml file or just only plugin as below.
>>> I saw L286 to L296 are not correct information in pom.xml.
>>> Thanks.
>>>
>>>
>>>
>>> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId
>>> >maven-assembly-plugin</artifactId> <version>2.4</version> <
>>> configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</
>>> descriptorRef> </descriptorRefs> </configuration> </plugin>
>>>
>>> On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <
>>> balassi.mar...@gmail.com> wrote:
>>>
>>>> As for locally I meant the machine that you use for development to see
>>>> whether this works without parallelism. :-) No need to install stuff on
>>>> your Namenode of course.
>>>> Installing Kafka on a machine and having the Kafka Java dependencies
>>>> available for Flink are two very different things. Try adding the following
>>>> [1] to your maven pom. Then execute 'mvn assembly:assembly', this will
>>>> produce a fat jar suffiexed jar-with-dependencies.jar. You should be able
>>>> to run the example form that.
>>>>
>>>> [1]
>>>> https://github.com/mbalassi/flink-dataflow/blob/master/pom.xml#L286-296
>>>>
>>>> On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <hawin.ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear Marton
>>>>>
>>>>> What do you meaning for locally Eclipse with 'Run'.
>>>>> Do you want to me to run it on Namenode?
>>>>> But my namenode didn't install Kafka.  I only installed Kafka on my
>>>>> data node servers.
>>>>> Do I need to install or copy Kafka jar on Namenode? Actually, I don't
>>>>> want to install everything on Name node server.
>>>>> Please advise me.
>>>>> Thanks.
>>>>>
>>>>>
>>>>> My Flink and Hadoop cluster info as below.
>>>>>
>>>>> Flink on NameNode
>>>>> Kafka,Zookeeper and FLink slave1 on Datanode1
>>>>> Kafka,Zookeeper ,and Flink slave2 on Datanode2
>>>>> Kafka, Zookeeper and Flink slave3 on Datanode3
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <
>>>>> balassi.mar...@gmail.com> wrote:
>>>>>
>>>>>> Dear Hawin,
>>>>>>
>>>>>> No problem, I am gald that you are giving our Kafka connector a try.
>>>>>> :)
>>>>>> The dependencies listed look good. Can you run the example locally
>>>>>> from Eclipse with 'Run'? I suspect that maybe your Flink cluster does not
>>>>>> have the access to the kafka dependency then.
>>>>>>
>>>>>> As a quick test you could copy the kafka jars to the lib folder of
>>>>>> your Flink distribution on all the machines in your cluster. Everything
>>>>>> that is there goes to the classpath of Flink. Another workaround with be 
>>>>>> to
>>>>>> build a fat jar for your project containing all the dependencies with 
>>>>>> 'mvn
>>>>>> assembly:assembly'. Neither of these are beautiful but would help 
>>>>>> tracking
>>>>>> down the root cause.
>>>>>>
>>>>>> On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <hawin.ji...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Dear Marton
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks for supporting again.
>>>>>>>
>>>>>>> I am running these examples at the same project and I am using
>>>>>>> Eclipse IDE to submit it to my Flink cluster.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Here is my dependencies
>>>>>>>
>>>>>>>
>>>>>>> ******************************************************************************
>>>>>>>
>>>>>>> <dependencies>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>*junit*</groupId>
>>>>>>>
>>>>>>>             <artifactId>*junit*</artifactId>
>>>>>>>
>>>>>>>             <version>4.12</version>
>>>>>>>
>>>>>>>             <scope>test</scope>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>
>>>>>>>             <artifactId>*flink*-java</artifactId>
>>>>>>>
>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>
>>>>>>>             <artifactId>*flink*-clients</artifactId>
>>>>>>>
>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>
>>>>>>>             <artifactId>*flink*-streaming-connectors</artifactId>
>>>>>>>
>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>
>>>>>>>             <artifactId>*flink*-streaming-core</artifactId>
>>>>>>>
>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>>>
>>>>>>>             <artifactId>kafka_2.10</artifactId>
>>>>>>>
>>>>>>>             <version>0.8.2.1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>>>
>>>>>>>             <artifactId>*kafka*-clients</artifactId>
>>>>>>>
>>>>>>>             <version>0.8.2.1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>
>>>>>>>             <artifactId>*hadoop*-*hdfs*</artifactId>
>>>>>>>
>>>>>>>             <version>2.6.0</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>
>>>>>>>             <artifactId>*hadoop*-*auth*</artifactId>
>>>>>>>
>>>>>>>             <version>2.6.0</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>
>>>>>>>             <artifactId>*hadoop*-common</artifactId>
>>>>>>>
>>>>>>>             <version>2.6.0</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>         <dependency>
>>>>>>>
>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>
>>>>>>>             <artifactId>*hadoop*-core</artifactId>
>>>>>>>
>>>>>>>             <version>1.2.1</version>
>>>>>>>
>>>>>>>         </dependency>
>>>>>>>
>>>>>>>     </dependencies>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *****************************************************************************************
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Best regards
>>>>>>>
>>>>>>> Email: hawin.ji...@gmail.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Márton Balassi [mailto:balassi.mar...@gmail.com]
>>>>>>> *Sent:* Thursday, June 11, 2015 12:58 AM
>>>>>>> *To:* user@flink.apache.org
>>>>>>> *Subject:* Re: Kafka0.8.2.1 + Flink0.9.0 issue
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Dear Hawin,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> This looks like a dependency issue, the java compiler does not find
>>>>>>> the kafka dependency. How are you trying to run this example? Is it 
>>>>>>> from an
>>>>>>> IDE or submitting it to a flink cluster with bin/flink run? How do you
>>>>>>> define your dependencies, do you use maven or sbt for instance?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Marton
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <hawin.ji...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to