Hi  Aljoscha

You are the best.
Thank you very much.
Right now, It is working now.



Best regards
Hawin

On Fri, Jun 26, 2015 at 12:28 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> could you please try replacing JavaDefaultStringSchema() with
> SimpleStringSchema() in your first example. The one where you get this
> exception:
> org.apache.commons.lang3.SerializationException:
> java.io.StreamCorruptedException: invalid stream header: 68617769
>
> Cheers,
> Aljoscha
>
> On Fri, 26 Jun 2015 at 08:21 Hawin Jiang <hawin.ji...@gmail.com> wrote:
>
>> Dear Marton
>>
>>
>> Here are some errors when I run KafkaProducerExample.java from Eclipse.
>>
>> kafka.common.KafkaException: fetching topic metadata for topics
>> [Set(flink-kafka-topic)] from broker
>> [ArrayBuffer(id:0,host:192.168.0.112,port:2181)] failed
>> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>> at
>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>> at
>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
>> at kafka.utils.Utils$.swallow(Utils.scala:172)
>> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>> at
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
>> at kafka.producer.Producer.send(Producer.scala:77)
>> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
>> at
>> org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:102)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.EOFException: Received -1 when reading from channel,
>> socket has likely been closed.
>> at kafka.utils.Utils$.read(Utils.scala:381)
>> at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> at
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>> at
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>
>> On Thu, Jun 25, 2015 at 11:06 PM, Hawin Jiang <hawin.ji...@gmail.com>
>> wrote:
>>
>>> Dear Marton
>>>
>>> I have upgraded my Flink to 0.9.0.  But I could not consume a data from
>>> Kafka by Flink.
>>> I have fully followed your example.
>>> Please help me.
>>> Thanks.
>>>
>>>
>>> Here is my code
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
>>>
>>> DataStream<String> kafkaStream = env
>>> .addSource(new KafkaSource<String>(host + ":" + port, topic, new
>>> JavaDefaultStringSchema()));
>>> kafkaStream.print();
>>>
>>> env.execute();
>>>
>>>
>>> Here are some errors:
>>>
>>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread:
>>> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1],
>>> Shutting down
>>> 15/06/25 22:57:52 INFO consumer.SimpleConsumer: Reconnect due to socket
>>> error: java.nio.channels.ClosedByInterruptException
>>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread:
>>> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1],
>>> Stopped
>>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherThread:
>>> [ConsumerFetcherThread-flink-group_hawin-1435298168910-10520844-0-1],
>>> Shutdown completed
>>> 15/06/25 22:57:52 INFO consumer.ConsumerFetcherManager:
>>> [ConsumerFetcherManager-1435298169147] All connections stopped
>>> 15/06/25 22:57:52 INFO zkclient.ZkEventThread: Terminate ZkClient event
>>> thread.
>>> 15/06/25 22:57:52 INFO zookeeper.ZooKeeper: Session: 0x14e2e5b2dad000a
>>> closed
>>> 15/06/25 22:57:52 INFO consumer.ZookeeperConsumerConnector:
>>> [flink-group_hawin-1435298168910-10520844], ZKConsumerConnector shutdown
>>> completed in 40 ms
>>> 15/06/25 22:57:52 ERROR tasks.SourceStreamTask: Custom Source -> Stream
>>> Sink (3/4) failed
>>> org.apache.commons.lang3.SerializationException:
>>> java.io.StreamCorruptedException: invalid stream header: 68617769
>>> at
>>> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
>>> at
>>> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
>>> at
>>> org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:40)
>>> at
>>> org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema.deserialize(JavaDefaultStringSchema.java:24)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.api.KafkaSource.run(KafkaSource.java:193)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.StreamCorruptedException: invalid stream header:
>>> 68617769
>>> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
>>> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
>>> at
>>> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
>>> ... 8 more
>>>
>>> On Tue, Jun 23, 2015 at 6:31 AM, Márton Balassi <
>>> balassi.mar...@gmail.com> wrote:
>>>
>>>> Dear Hawin,
>>>>
>>>> Sorry, I ahve managed to link to a pom that has been changed in the
>>>> meantime. But we have added a section to our doc clarifying your question.
>>>> [1] Since then Stephan has proposed an even nicer solution that did not
>>>> make it into the doc yet, namely if you start from our quickstart pom and
>>>> add your dependencies to that simply executing 'mvn package -Pbuild-jar'
>>>> you get a jar with all your the code that is needed to run it on the
>>>> cluster, but not more. See [3] for more on the quickstart.
>>>>
>>>> [1]
>>>> http://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
>>>> [2]
>>>> https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
>>>> [3]
>>>> http://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html
>>>>
>>>> On Tue, Jun 23, 2015 at 6:48 AM, Ashutosh Kumar <
>>>> ashutosh.disc...@gmail.com> wrote:
>>>>
>>>>> I use following dependencies and it works fine .
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-java</artifactId>
>>>>> <version>0.9-SNAPSHOT</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-clients</artifactId>
>>>>> <version>0.9-SNAPSHOT</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-core</artifactId>
>>>>> <version>0.9-SNAPSHOT</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka</artifactId>
>>>>> <version>0.9-SNAPSHOT</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>>
>>>>> On Mon, Jun 22, 2015 at 10:07 PM, Hawin Jiang <hawin.ji...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi  Marton
>>>>>>
>>>>>> I have to add whole pom.xml file or just only plugin as below.
>>>>>> I saw L286 to L296 are not correct information in pom.xml.
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId
>>>>>> >maven-assembly-plugin</artifactId> <version>2.4</version> <
>>>>>> configuration> <descriptorRefs> <descriptorRef
>>>>>> >jar-with-dependencies</descriptorRef> </descriptorRefs> </
>>>>>> configuration> </plugin>
>>>>>>
>>>>>> On Thu, Jun 11, 2015 at 1:43 AM, Márton Balassi <
>>>>>> balassi.mar...@gmail.com> wrote:
>>>>>>
>>>>>>> As for locally I meant the machine that you use for development to
>>>>>>> see whether this works without parallelism. :-) No need to install 
>>>>>>> stuff on
>>>>>>> your Namenode of course.
>>>>>>> Installing Kafka on a machine and having the Kafka Java dependencies
>>>>>>> available for Flink are two very different things. Try adding the 
>>>>>>> following
>>>>>>> [1] to your maven pom. Then execute 'mvn assembly:assembly', this will
>>>>>>> produce a fat jar suffiexed jar-with-dependencies.jar. You should be 
>>>>>>> able
>>>>>>> to run the example form that.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/mbalassi/flink-dataflow/blob/master/pom.xml#L286-296
>>>>>>>
>>>>>>> On Thu, Jun 11, 2015 at 10:32 AM, Hawin Jiang <hawin.ji...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Dear Marton
>>>>>>>>
>>>>>>>> What do you meaning for locally Eclipse with 'Run'.
>>>>>>>> Do you want to me to run it on Namenode?
>>>>>>>> But my namenode didn't install Kafka.  I only installed Kafka on my
>>>>>>>> data node servers.
>>>>>>>> Do I need to install or copy Kafka jar on Namenode? Actually, I
>>>>>>>> don't want to install everything on Name node server.
>>>>>>>> Please advise me.
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>> My Flink and Hadoop cluster info as below.
>>>>>>>>
>>>>>>>> Flink on NameNode
>>>>>>>> Kafka,Zookeeper and FLink slave1 on Datanode1
>>>>>>>> Kafka,Zookeeper ,and Flink slave2 on Datanode2
>>>>>>>> Kafka, Zookeeper and Flink slave3 on Datanode3
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 11, 2015 at 1:16 AM, Márton Balassi <
>>>>>>>> balassi.mar...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Dear Hawin,
>>>>>>>>>
>>>>>>>>> No problem, I am gald that you are giving our Kafka connector a
>>>>>>>>> try. :)
>>>>>>>>> The dependencies listed look good. Can you run the example locally
>>>>>>>>> from Eclipse with 'Run'? I suspect that maybe your Flink cluster does 
>>>>>>>>> not
>>>>>>>>> have the access to the kafka dependency then.
>>>>>>>>>
>>>>>>>>> As a quick test you could copy the kafka jars to the lib folder of
>>>>>>>>> your Flink distribution on all the machines in your cluster. 
>>>>>>>>> Everything
>>>>>>>>> that is there goes to the classpath of Flink. Another workaround with 
>>>>>>>>> be to
>>>>>>>>> build a fat jar for your project containing all the dependencies with 
>>>>>>>>> 'mvn
>>>>>>>>> assembly:assembly'. Neither of these are beautiful but would help 
>>>>>>>>> tracking
>>>>>>>>> down the root cause.
>>>>>>>>>
>>>>>>>>> On Thu, Jun 11, 2015 at 10:04 AM, Hawin Jiang <
>>>>>>>>> hawin.ji...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Dear Marton
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks for supporting again.
>>>>>>>>>>
>>>>>>>>>> I am running these examples at the same project and I am using
>>>>>>>>>> Eclipse IDE to submit it to my Flink cluster.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Here is my dependencies
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ******************************************************************************
>>>>>>>>>>
>>>>>>>>>> <dependencies>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>*junit*</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*junit*</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>4.12</version>
>>>>>>>>>>
>>>>>>>>>>             <scope>test</scope>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*flink*-java</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*flink*-clients</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*flink*-streaming-connectors</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*flink*-streaming-core</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>0.9.0-milestone-1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>kafka_2.10</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>0.8.2.1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*kafka*-clients</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>0.8.2.1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*hadoop*-*hdfs*</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>2.6.0</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*hadoop*-*auth*</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>2.6.0</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*hadoop*-common</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>2.6.0</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>
>>>>>>>>>>             <groupId>org.apache.hadoop</groupId>
>>>>>>>>>>
>>>>>>>>>>             <artifactId>*hadoop*-core</artifactId>
>>>>>>>>>>
>>>>>>>>>>             <version>1.2.1</version>
>>>>>>>>>>
>>>>>>>>>>         </dependency>
>>>>>>>>>>
>>>>>>>>>>     </dependencies>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *****************************************************************************************
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Email: hawin.ji...@gmail.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *From:* Márton Balassi [mailto:balassi.mar...@gmail.com]
>>>>>>>>>> *Sent:* Thursday, June 11, 2015 12:58 AM
>>>>>>>>>> *To:* user@flink.apache.org
>>>>>>>>>> *Subject:* Re: Kafka0.8.2.1 + Flink0.9.0 issue
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dear Hawin,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This looks like a dependency issue, the java compiler does not
>>>>>>>>>> find the kafka dependency. How are you trying to run this example? 
>>>>>>>>>> Is it
>>>>>>>>>> from an IDE or submitting it to a flink cluster with bin/flink run? 
>>>>>>>>>> How do
>>>>>>>>>> you define your dependencies, do you use maven or sbt for instance?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Marton
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 11, 2015 at 9:43 AM, Hawin Jiang <
>>>>>>>>>> hawin.ji...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to