Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is
a kafka topic with one partition so far and we are using the
FlinkKafkaConsumer (kafka-connector-1.13.2)
Sometimes we get some errors from the consumer like the below:

"locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
"logger": "org.apache.kafka.clients.FetchSessionHandler",
"message": "[Consumer
clientId=consumer-realtime-analytics-eu-production-node2-2,
groupId=realtime-analytics-eu-production-node2] Error sending fetch request
(sessionId=1343463307, epoch=172059) to node 3:
org.apache.kafka.common.errors.DisconnectException.",
    "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map
-> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",

With the debug logging it appeared that this happens due to request timeout
so I have increased the request.timeout.ms to 60000 , however it did not
resolve the issue. Even if I get the disconnection I can see that after 1s
the consumer sends a successful fetchRequest.

The problem we have noticed is that after the disconnection the application
stays behind from processing. the backpressure on the source gets 100% and
the app consumes events at a lower rate even if we do not have much traffic
to cope with.

We use eventTime and the watermarks are not generated in the consumer since
we have one partition. the source is the following

DataStream<ServerAwareJournal> stream =

env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f ->
!f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);

and then we assign the following watermark:

WatermarkStrategy.<ServerAwareJournal
>forBoundedOutOfOrderness(Duration.ofSeconds(3))
            .withTimestampAssigner((element, recordTimestamp) ->
element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
                    .toEpochMilli()).withIdleness(Duration.ofMinutes(1));

the upstream operators are 10 cep operators with a parallelism of 15 and
then there is a union of the data emitted from the CEP operators and added
to firehose sink.
Another thing is that we ran two parallel instances of the same application
i.e two kinesis analytics nodes (one for debug purposes), the debug node
has checkpointing disabled.

Could you please give me some advice on where to look to find a solution to
this issue?
Thanks in advance

Reply via email to