Hi Isidoros, 

Your watermark strategy looks fine to me. I’m not quite sure if it is related. 

Best regards, 

Qingsheng

> On Mar 24, 2022, at 21:11, Isidoros Ioannou <akis3...@gmail.com> wrote:
> 
> Hi Qingsheng,
> 
> thank you a lot for you response.
> The message I see from the consumer before the log exception I provided 
> previously is this:
> "locationInformation": 
> "org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)",
>     "logger": "org.apache.kafka.clients.NetworkClient",
>     "message": "[Consumer 
> clientId=consumer-realtime-analytics-eu-production-node2-2, 
> groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3 due 
> to request timeout."
> 
> I saw it in debug mode and thats the reason I increased the 
> "request.timeout.ms". 
> I will follow your advice and investigate the broker logs once the event 
> occurs again.
> 
> Regarding the backpressure. the 10 cep operators we have, use some iterative 
> conditions that add some burden and in periods of high load the operators are 
> getting red in flink ui so these add the backpressure.
> However, in mediocre load the operators are performing fine, except when we 
> have disconnections. It seems that after the disconnection the watermarks are 
> not emmited quickly causing the operators not to release the
> data to sinks. I don't know actually if I have helped, but is there any 
> chance that it would be a problem of how we have configured the watermarks?
> 
> Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren <renqs...@gmail.com> 
> έγραψε:
> Hi Isidoros,
> 
> I’m not sure in which kind of way the timeout and the high back pressure are 
> related, but I think we can try to resolve the request timeout issue first. 
> You can take a look at the request log on Kafka broker and see if the request 
> was received by broker, and how long it takes for broker to handle it. By 
> default the request log is on WARN level, and you may want to increase it to 
> DEBUG or TRACE to reveal more information. 
> 
> Another thought in my mind is about the content of the record, since you 
> mentioned extremely high back pressure after the disconnection issue. If some 
> messages are quite large or complex, it might block the network or require 
> more resources to make the serde, even burden some operator in the pipeline 
> and finally lead to back pressure. Once the back pressure happens in the 
> pipeline, you can try to locate the operator causing the back pressure and 
> make some analysis to see why the throughput drops, or dump the record to see 
> if there’s something special in it. 
> 
> Hope these could be helpful! 
> 
> Best regards, 
> 
> Qingsheng
> 
> > On Mar 23, 2022, at 19:19, Isidoros Ioannou <akis3...@gmail.com> wrote:
> > 
> > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is 
> > a kafka topic with one partition so far and we are using the 
> > FlinkKafkaConsumer (kafka-connector-1.13.2) 
> > Sometimes we get some errors from the consumer like the below:
> > 
> > "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
> > "logger": "org.apache.kafka.clients.FetchSessionHandler",
> > "message": "[Consumer 
> > clientId=consumer-realtime-analytics-eu-production-node2-2, 
> > groupId=realtime-analytics-eu-production-node2] Error sending fetch request 
> > (sessionId=1343463307, epoch=172059) to node 3: 
> > org.apache.kafka.common.errors.DisconnectException.",
> >     "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map 
> > -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",
> > 
> > With the debug logging it appeared that this happens due to request timeout 
> > so I have increased the request.timeout.ms to 60000 , however it did not 
> > resolve the issue. Even if I get the disconnection I can see that after 1s 
> > the consumer sends a successful fetchRequest.
> > 
> > The problem we have noticed is that after the disconnection the application 
> > stays behind from processing. the backpressure on the source gets 100% and 
> > the app consumes events at a lower rate even if we do not have much traffic 
> > to cope with. 
> > 
> > We use eventTime and the watermarks are not generated in the consumer since 
> > we have one partition. the source is the following
> > 
> > DataStream<ServerAwareJournal> stream =
> >                 
> > env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> 
> > !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);
> > 
> > and then we assign the following watermark: 
> > 
> > WatermarkStrategy.<ServerAwareJournal 
> > >forBoundedOutOfOrderness(Duration.ofSeconds(3))
> >             .withTimestampAssigner((element, recordTimestamp) -> 
> > element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
> >                     .toEpochMilli()).withIdleness(Duration.ofMinutes(1));
> > 
> > the upstream operators are 10 cep operators with a parallelism of 15 and 
> > then there is a union of the data emitted from the CEP operators and added 
> > to firehose sink.
> > Another thing is that we ran two parallel instances of the same application 
> > i.e two kinesis analytics nodes (one for debug purposes), the debug node 
> > has checkpointing disabled.
> > 
> > Could you please give me some advice on where to look to find a solution to 
> > this issue?
> > Thanks in advance
> > 
> > 
> 

Reply via email to