Hi Isidoros, Your watermark strategy looks fine to me. I’m not quite sure if it is related.
Best regards, Qingsheng > On Mar 24, 2022, at 21:11, Isidoros Ioannou <akis3...@gmail.com> wrote: > > Hi Qingsheng, > > thank you a lot for you response. > The message I see from the consumer before the log exception I provided > previously is this: > "locationInformation": > "org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)", > "logger": "org.apache.kafka.clients.NetworkClient", > "message": "[Consumer > clientId=consumer-realtime-analytics-eu-production-node2-2, > groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3 due > to request timeout." > > I saw it in debug mode and thats the reason I increased the > "request.timeout.ms". > I will follow your advice and investigate the broker logs once the event > occurs again. > > Regarding the backpressure. the 10 cep operators we have, use some iterative > conditions that add some burden and in periods of high load the operators are > getting red in flink ui so these add the backpressure. > However, in mediocre load the operators are performing fine, except when we > have disconnections. It seems that after the disconnection the watermarks are > not emmited quickly causing the operators not to release the > data to sinks. I don't know actually if I have helped, but is there any > chance that it would be a problem of how we have configured the watermarks? > > Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren <renqs...@gmail.com> > έγραψε: > Hi Isidoros, > > I’m not sure in which kind of way the timeout and the high back pressure are > related, but I think we can try to resolve the request timeout issue first. > You can take a look at the request log on Kafka broker and see if the request > was received by broker, and how long it takes for broker to handle it. By > default the request log is on WARN level, and you may want to increase it to > DEBUG or TRACE to reveal more information. > > Another thought in my mind is about the content of the record, since you > mentioned extremely high back pressure after the disconnection issue. If some > messages are quite large or complex, it might block the network or require > more resources to make the serde, even burden some operator in the pipeline > and finally lead to back pressure. Once the back pressure happens in the > pipeline, you can try to locate the operator causing the back pressure and > make some analysis to see why the throughput drops, or dump the record to see > if there’s something special in it. > > Hope these could be helpful! > > Best regards, > > Qingsheng > > > On Mar 23, 2022, at 19:19, Isidoros Ioannou <akis3...@gmail.com> wrote: > > > > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is > > a kafka topic with one partition so far and we are using the > > FlinkKafkaConsumer (kafka-connector-1.13.2) > > Sometimes we get some errors from the consumer like the below: > > > > "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)", > > "logger": "org.apache.kafka.clients.FetchSessionHandler", > > "message": "[Consumer > > clientId=consumer-realtime-analytics-eu-production-node2-2, > > groupId=realtime-analytics-eu-production-node2] Error sending fetch request > > (sessionId=1343463307, epoch=172059) to node 3: > > org.apache.kafka.common.errors.DisconnectException.", > > "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map > > -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0", > > > > With the debug logging it appeared that this happens due to request timeout > > so I have increased the request.timeout.ms to 60000 , however it did not > > resolve the issue. Even if I get the disconnection I can see that after 1s > > the consumer sends a successful fetchRequest. > > > > The problem we have noticed is that after the disconnection the application > > stays behind from processing. the backpressure on the source gets 100% and > > the app consumes events at a lower rate even if we do not have much traffic > > to cope with. > > > > We use eventTime and the watermarks are not generated in the consumer since > > we have one partition. the source is the following > > > > DataStream<ServerAwareJournal> stream = > > > > env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> > > !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId); > > > > and then we assign the following watermark: > > > > WatermarkStrategy.<ServerAwareJournal > > >forBoundedOutOfOrderness(Duration.ofSeconds(3)) > > .withTimestampAssigner((element, recordTimestamp) -> > > element.getMessage().getDateTime().atZone(journalTimezone).toInstant() > > .toEpochMilli()).withIdleness(Duration.ofMinutes(1)); > > > > the upstream operators are 10 cep operators with a parallelism of 15 and > > then there is a union of the data emitted from the CEP operators and added > > to firehose sink. > > Another thing is that we ran two parallel instances of the same application > > i.e two kinesis analytics nodes (one for debug purposes), the debug node > > has checkpointing disabled. > > > > Could you please give me some advice on where to look to find a solution to > > this issue? > > Thanks in advance > > > > >