Hi, In the last week I have correctly deployed a flink program which get data from a kafka broker on my local machine. Now I'm trying to produce the same thing but moving the kafka broker on a cluster.
I didn't change any line of code, I report it here: DataStream<Tuple2<String,JSONLDObject>> stream = env .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), properties)) .assignTimestampsAndWatermarks(new CustomTimestampExtractor()) .keyBy(0); While I have changed just the Kafka Ip. Data model obviously is not changed. Unfortunately now when I start Flink program I get this: INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1 12:30:48,446 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5 12:30:48,625 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) for group groupId. 12:30:48,626 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - *Marking the coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) dead for group groupId* I bolded the line that worry me. Then, no data are retrieved buy Kafka although flink continue to perform checkpointing etc normally... Any ideas? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.