Hi,
In the last week I have correctly deployed a flink program which get data
from a kafka broker on my local machine.
Now I'm trying to produce the same thing but moving the kafka broker on a
cluster.

I didn't change any line of code, I report it here:

DataStream<Tuple2&lt;String,JSONLDObject>> stream = env
                                .addSource(new FlinkKafkaConsumer010<>(TOPIC, 
new CustomDeserializer(),
properties))
                                .assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
                                .keyBy(0);

While I have changed just the Kafka Ip.
Data model obviously is not changed.
Unfortunately now when I start Flink program I get this:

INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka
version : 0.10.0.1
12:30:48,446 INFO  org.apache.kafka.common.utils.AppInfoParser                  
- Kafka commitId : a7a17cdec9eaa6c5
12:30:48,625 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
Discovered coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) for
group groupId.
12:30:48,626 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - *Marking
the coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) dead for
group groupId*

I bolded the line that worry me.

Then, no data are retrieved buy Kafka although flink continue to perform
checkpointing etc normally...

Any ideas?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to