Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is a kafka topic with one partition so far and we are using the FlinkKafkaConsumer (kafka-connector-1.13.2) Sometimes we get some errors from the consumer like the below:
"locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)", "logger": "org.apache.kafka.clients.FetchSessionHandler", "message": "[Consumer clientId=consumer-realtime-analytics-eu-production-node2-2, groupId=realtime-analytics-eu-production-node2] Error sending fetch request (sessionId=1343463307, epoch=172059) to node 3: org.apache.kafka.common.errors.DisconnectException.", "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0", With the debug logging it appeared that this happens due to request timeout so I have increased the request.timeout.ms to 60000 , however it did not resolve the issue. Even if I get the disconnection I can see that after 1s the consumer sends a successful fetchRequest. The problem we have noticed is that after the disconnection the application stays behind from processing. the backpressure on the source gets 100% and the app consumes events at a lower rate even if we do not have much traffic to cope with. We use eventTime and the watermarks are not generated in the consumer since we have one partition. the source is the following DataStream<ServerAwareJournal> stream = env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId); and then we assign the following watermark: WatermarkStrategy.<ServerAwareJournal >forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.getMessage().getDateTime().atZone(journalTimezone).toInstant() .toEpochMilli()).withIdleness(Duration.ofMinutes(1)); the upstream operators are 10 cep operators with a parallelism of 15 and then there is a union of the data emitted from the CEP operators and added to firehose sink. Another thing is that we ran two parallel instances of the same application i.e two kinesis analytics nodes (one for debug purposes), the debug node has checkpointing disabled. Could you please give me some advice on where to look to find a solution to this issue? Thanks in advance