Hi to all,

i am trying to make Flink to work with Kafka but i always have the following 
exception. It works perfect on my laptop but when i try to use it on the 
cluster it always fails. 

java.lang.Exception
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:759)
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getMissingOffsetsFromKafka(LegacyFetcher.java:712)
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:462)


Here is my pom.xml if it helps with the error

        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>1.0.0</version>
                </dependency>

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.10</artifactId>
                        <version>1.0.0</version>
                </dependency>


                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.10</artifactId>
                        <version>1.0.0</version>
                </dependency>


                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
                        <version>1.0.0</version>
                </dependency>


        </dependencies>

Best regards,
Stefanos

Reply via email to