Hi Qingsheng, thank you a lot for you response. The message I see from the consumer before the log exception I provided previously is this: "locationInformation": "org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)", "logger": "org.apache.kafka.clients.NetworkClient", "message": "[Consumer clientId=consumer-realtime-analytics-eu-production-node2-2, groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3 due to request timeout."
I saw it in debug mode and thats the reason I increased the " request.timeout.ms". I will follow your advice and investigate the broker logs once the event occurs again. Regarding the backpressure. the 10 cep operators we have, use some iterative conditions that add some burden and in periods of high load the operators are getting red in flink ui so these add the backpressure. However, in mediocre load the operators are performing fine, except when we have disconnections. It seems that after the disconnection the watermarks are not emmited quickly causing the operators not to release the data to sinks. I don't know actually if I have helped, but is there any chance that it would be a problem of how we have configured the watermarks? Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren <renqs...@gmail.com> έγραψε: > Hi Isidoros, > > I’m not sure in which kind of way the timeout and the high back pressure > are related, but I think we can try to resolve the request timeout issue > first. You can take a look at the request log on Kafka broker and see if > the request was received by broker, and how long it takes for broker to > handle it. By default the request log is on WARN level, and you may want to > increase it to DEBUG or TRACE to reveal more information. > > Another thought in my mind is about the content of the record, since you > mentioned extremely high back pressure after the disconnection issue. If > some messages are quite large or complex, it might block the network or > require more resources to make the serde, even burden some operator in the > pipeline and finally lead to back pressure. Once the back pressure happens > in the pipeline, you can try to locate the operator causing the back > pressure and make some analysis to see why the throughput drops, or dump > the record to see if there’s something special in it. > > Hope these could be helpful! > > Best regards, > > Qingsheng > > > On Mar 23, 2022, at 19:19, Isidoros Ioannou <akis3...@gmail.com> wrote: > > > > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source > is a kafka topic with one partition so far and we are using the > FlinkKafkaConsumer (kafka-connector-1.13.2) > > Sometimes we get some errors from the consumer like the below: > > > > > "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)", > > "logger": "org.apache.kafka.clients.FetchSessionHandler", > > "message": "[Consumer > clientId=consumer-realtime-analytics-eu-production-node2-2, > groupId=realtime-analytics-eu-production-node2] Error sending fetch request > (sessionId=1343463307, epoch=172059) to node 3: > org.apache.kafka.common.errors.DisconnectException.", > > "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> > Map -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0", > > > > With the debug logging it appeared that this happens due to request > timeout so I have increased the request.timeout.ms to 60000 , however it > did not resolve the issue. Even if I get the disconnection I can see that > after 1s the consumer sends a successful fetchRequest. > > > > The problem we have noticed is that after the disconnection the > application stays behind from processing. the backpressure on the source > gets 100% and the app consumes events at a lower rate even if we do not > have much traffic to cope with. > > > > We use eventTime and the watermarks are not generated in the consumer > since we have one partition. the source is the following > > > > DataStream<ServerAwareJournal> stream = > > > env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> > !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId); > > > > and then we assign the following watermark: > > > > WatermarkStrategy.<ServerAwareJournal > >forBoundedOutOfOrderness(Duration.ofSeconds(3)) > > .withTimestampAssigner((element, recordTimestamp) -> > element.getMessage().getDateTime().atZone(journalTimezone).toInstant() > > .toEpochMilli()).withIdleness(Duration.ofMinutes(1)); > > > > the upstream operators are 10 cep operators with a parallelism of 15 and > then there is a union of the data emitted from the CEP operators and added > to firehose sink. > > Another thing is that we ran two parallel instances of the same > application i.e two kinesis analytics nodes (one for debug purposes), the > debug node has checkpointing disabled. > > > > Could you please give me some advice on where to look to find a solution > to this issue? > > Thanks in advance > > > > > >