Hi Stephan, sorry for late response. We indeed use timers inside a KeyedProcessFunction but the triggers of the timers are kinda evenly distributed, so not causing a firing storm. We have a custom ttl logic which is used by the deletion timer to decide whether delete a record from inmemory state or not. Can you maybe give some links to those changes in the RocksDB?
Thanks in advance, Bekir Oguz On Fri, 30 Aug 2019 at 14:56, Stephan Ewen <se...@apache.org> wrote: > Hi all! > > A thought would be that this has something to do with timers. Does the > task with that behavior use timers (windows, or process function)? > > If that is the case, some theories to check: > - Could it be a timer firing storm coinciding with a checkpoint? > Currently, that storm synchronously fires, checkpoints cannot preempt that, > which should change in 1.10 with the new mailbox model. > - Could the timer-async checkpointing changes have something to do with > that? Does some of the usually small "preparation work" (happening > synchronously) lead to an unwanted effect? > - Are you using TTL for state in that operator? > - There were some changes made to support timers in RocksDB recently. > Could that have something to do with it? > > Best, > Stephan > > > On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu <qcx978132...@gmail.com> > wrote: > >> CC flink dev mail list >> Update for those who may be interested in this issue, we'are still >> diagnosing this problem currently. >> >> Best, >> Congxian >> >> >> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月29日周四 下午8:58写道: >> >> > Hi Bekir >> > >> > Currently, from what we have diagnosed, there is some task complete its >> > checkpoint too late (maybe 15 mins), but we checked the kafka broker log >> > and did not find any interesting things there. could we run another job, >> > that did not commit offset to kafka, this wants to check if it is the >> > "commit offset to kafka" step consumes too much time. >> > >> > Best, >> > Congxian >> > >> > >> > Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月28日周三 下午4:19写道: >> > >> >> Hi Congxian, >> >> sorry for the late reply, but no progress on this issue yet. I checked >> >> also the kafka broker logs, found nothing interesting there. >> >> And we still have 15 min duration checkpoints quite often. Any more >> ideas >> >> on where to look at? >> >> >> >> Regards, >> >> Bekir >> >> >> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu <qcx978132...@gmail.com> >> >> wrote: >> >> >> >>> Hi Bekir >> >>> >> >>> Do you come back to work now, does there any more findings of this >> >>> problem? >> >>> >> >>> Best, >> >>> Congxian >> >>> >> >>> >> >>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月13日周二 下午4:39写道: >> >>> >> >>>> Hi Congxian, >> >>>> Thanks for following up this issue. It is still unresolved and I am >> on >> >>>> vacation at the moment. Hopefully my collegues Niels and Vlad can >> spare >> >>>> some time to look into this. >> >>>> >> >>>> @Niels, @Vlad: do you guys also think that this issue might be Kafka >> >>>> related? We could also check kafka broker logs at the time of long >> >>>> checkpointing. >> >>>> >> >>>> Thanks, >> >>>> Bekir >> >>>> >> >>>> Verstuurd vanaf mijn iPhone >> >>>> >> >>>> Op 12 aug. 2019 om 15:18 heeft Congxian Qiu <qcx978132...@gmail.com> >> >>>> het volgende geschreven: >> >>>> >> >>>> Hi Bekir >> >>>> >> >>>> Is there any progress about this problem? >> >>>> >> >>>> Best, >> >>>> Congxian >> >>>> >> >>>> >> >>>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月8日周四 下午11:17写道: >> >>>> >> >>>>> hi Bekir >> >>>>> Thanks for the information. >> >>>>> >> >>>>> - Source's checkpoint was triggered by RPC calls, so it has the >> >>>>> "Trigger checkpoint xxx" log, >> >>>>> - other task's checkpoint was triggered after received all the >> barrier >> >>>>> of upstreams, it didn't log the "Trigger checkpoint XXX" :( >> >>>>> >> >>>>> Your diagnose makes sense to me, we need to check the Kafka log. >> >>>>> I also find out that we always have a log like >> >>>>> "org.apache.kafka.clients.consumer.internals.AbstractCoordinator >> Marking >> >>>>> the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead >> >>>>> for group userprofileaggregator >> >>>>> 2019-08-06 13:58:49,872 DEBUG >> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - >> Notifica", >> >>>>> >> >>>>> I checked the doc of kafka[1], only find that the default of ` >> >>>>> transaction.max.timeout.ms` is 15 min >> >>>>> >> >>>>> Please let me know there you have any finds. thanks >> >>>>> >> >>>>> PS: maybe you can also checkpoint the log for task >> >>>>> `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message >> late also. >> >>>>> >> >>>>> [1] https://kafka.apache.org/documentation/ >> >>>>> Best, >> >>>>> Congxian >> >>>>> >> >>>>> >> >>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月7日周三 下午6:48写道: >> >>>>> >> >>>>>> Hi Congxian, >> >>>>>> Thanks for checking the logs. What I see from the logs is: >> >>>>>> >> >>>>>> - For the tasks like "Source: >> >>>>>> profileservice-snowplow-clean-events_kafka_source -> Filter” {17, >> 27, 31, >> >>>>>> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also >> ‘Confirm >> >>>>>> checkpoint’ log lines, with 15 mins delay in between. >> >>>>>> - For the tasks like “KeyedProcess -> (Sink: >> >>>>>> profileservice-userprofiles_kafka_sink, Sink: >> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink: >> >>>>>> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO >> NOT have >> >>>>>> the “Triggering checkpoint” log, but only the ‘Confirm checkpoint’ >> lines. >> >>>>>> >> >>>>>> And as a final point, we ALWAYS have Kafka AbstractCoordinator logs >> >>>>>> about lost connection to Kafka at the same time we have the >> checkpoints >> >>>>>> confirmed. This 15 minutes delay might be because of some timeout >> at the >> >>>>>> Kafka client (maybe 15 mins timeout), and then marking kafka >> coordinator >> >>>>>> dead, and then discovering kafka coordinator again. >> >>>>>> >> >>>>>> If the kafka connection is IDLE during 15 mins, Flink cannot >> confirm >> >>>>>> the checkpoints, cannot send the async offset commit request to >> Kafka. This >> >>>>>> could be the root cause of the problem. Please see the attached >> logs >> >>>>>> filtered on the Kafka AbstractCoordinator. Every time we have a 15 >> minutes >> >>>>>> checkpoint, we have this kafka issue. (Happened today at 9:14 and >> 9:52) >> >>>>>> >> >>>>>> >> >>>>>> I will enable Kafka DEBUG logging to see more and let you know >> about >> >>>>>> the findings. >> >>>>>> >> >>>>>> Thanks a lot for your support, >> >>>>>> Bekir Oguz >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> Op 7 aug. 2019, om 12:06 heeft Congxian Qiu < >> qcx978132...@gmail.com> >> >>>>>> het volgende geschreven: >> >>>>>> >> >>>>>> Hi >> >>>>>> >> >>>>>> Received all the files, as a first glance, the program uses at >> least >> >>>>>> once checkpoint mode, from the tm log, maybe we need to check >> checkpoint of >> >>>>>> this operator "Invoking async call Checkpoint Confirmation for >> KeyedProcess >> >>>>>> -> (Sink: profileservice-userprofiles_kafka_sink, Sink: >> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink: >> >>>>>> profileservice-profiledeletion_kafka_sink) (5/70) on task >> KeyedProcess -> >> >>>>>> (Sink: profileservice-userprofiles_kafka_sink, Sink: >> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink: >> >>>>>> profileservice-profiledeletion_kafka_sink) (5/70)", >> >>>>>> >> >>>>>> Seems it took too long to complete the checkpoint (maybe something >> >>>>>> about itself, or maybe something of Kafka). I'll go through the >> logs >> >>>>>> carefully today or tomorrow again. >> >>>>>> >> >>>>>> Best, >> >>>>>> Congxian >> >>>>>> >> >>>>>> >> >>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月6日周二 下午10:38写道: >> >>>>>> >> >>>>>>> Ok, I am removing apache dev group from CC. >> >>>>>>> Only sending to you and my colleagues. >> >>>>>>> >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Bekir >> >>>>>>> >> >>>>>>> Op 6 aug. 2019, om 17:33 heeft Bekir Oguz < >> bekir.o...@persgroep.net> >> >>>>>>> het volgende geschreven: >> >>>>>>> >> >>>>>>> Hi Congxian, >> >>>>>>> Previous email didn’t work out due to size limits. >> >>>>>>> I am sending you only job manager log zipped, and will send other >> >>>>>>> info in separate email. >> >>>>>>> <jobmanager_sb77v.log.zip> >> >>>>>>> Regards, >> >>>>>>> Bekir >> >>>>>>> >> >>>>>>> Op 2 aug. 2019, om 16:37 heeft Congxian Qiu < >> qcx978132...@gmail.com> >> >>>>>>> het volgende geschreven: >> >>>>>>> >> >>>>>>> Hi Bekir >> >>>>>>> >> >>>>>>> Cloud you please also share the below information: >> >>>>>>> - jobmanager.log >> >>>>>>> - taskmanager.log(with debug info enabled) for the problematic >> >>>>>>> subtask. >> >>>>>>> - the DAG of your program (if can provide the skeleton program is >> >>>>>>> better -- can send to me privately) >> >>>>>>> >> >>>>>>> For the subIndex, maybe we can use the deploy log message in >> >>>>>>> jobmanager log to identify which subtask we want. For example in >> JM log, >> >>>>>>> we'll have something like "2019-08-02 11:38:47,291 INFO >> >>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Deploying Source: >> >>>>>>> Custom Source (2/2) (attempt #0) to >> >>>>>>> container_e62_1551952890130_2071_01_000002 @ aa.bb.cc.dd.ee >> >>>>>>> (dataPort=39488)" then we know "Custum Source (2/2)" was deplyed >> to " >> >>>>>>> aa.bb.cc.dd.ee" with port 39488. Sadly, there maybe still more >> than >> >>>>>>> one subtasks in one contain :( >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Congxian >> >>>>>>> >> >>>>>>> >> >>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月2日周五 下午4:22写道: >> >>>>>>> >> >>>>>>>> Forgot to add the checkpoint details after it was complete. This >> is >> >>>>>>>> for that long running checkpoint with id 95632. >> >>>>>>>> >> >>>>>>>> <PastedGraphic-5.png> >> >>>>>>>> >> >>>>>>>> Op 2 aug. 2019, om 11:18 heeft Bekir Oguz < >> bekir.o...@persgroep.net> >> >>>>>>>> het volgende geschreven: >> >>>>>>>> >> >>>>>>>> Hi Congxian, >> >>>>>>>> I was able to fetch the logs of the task manager (attached) and >> the >> >>>>>>>> screenshots of the latest long checkpoint. I will get the logs >> of the job >> >>>>>>>> manager for the next long running checkpoint. And also I will >> try to get a >> >>>>>>>> jstack during the long running checkpoint. >> >>>>>>>> >> >>>>>>>> Note: Since at the Subtasks tab we do not have the subtask >> numbers, >> >>>>>>>> and at the Details tab of the checkpoint, we have the subtask >> numbers but >> >>>>>>>> not the task manager hosts, it is difficult to match those. >> We’re assuming >> >>>>>>>> they have the same order, so seeing that 3rd subtask is failing, >> I am >> >>>>>>>> getting the 3rd line at the Subtasks tab which leads to the task >> manager >> >>>>>>>> host flink-taskmanager-84ccd5bddf-2cbxn. ***It would be a great >> feature if >> >>>>>>>> you guys also include the subtask-id’s to the Subtasks view.*** >> >>>>>>>> >> >>>>>>>> Note: timestamps in the task manager log are in UTC and I am at >> the >> >>>>>>>> moment at zone UTC+3, so the time 10:30 at the screenshot >> matches the time >> >>>>>>>> 7:30 in the log. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Kind regards, >> >>>>>>>> Bekir >> >>>>>>>> >> >>>>>>>> <task_manager.log> >> >>>>>>>> >> >>>>>>>> <PastedGraphic-4.png> >> >>>>>>>> <PastedGraphic-3.png> >> >>>>>>>> <PastedGraphic-2.png> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Op 2 aug. 2019, om 07:23 heeft Congxian Qiu < >> qcx978132...@gmail.com> >> >>>>>>>> het volgende geschreven: >> >>>>>>>> >> >>>>>>>> Hi Bekir >> >>>>>>>> I’ll first summary the problem here(please correct me if I’m >> wrong) >> >>>>>>>> 1. The same program runs on 1.6 never encounter such problems >> >>>>>>>> 2. Some checkpoints completed too long (15+ min), but other >> normal >> >>>>>>>> checkpoints complete less than 1 min >> >>>>>>>> 3. Some bad checkpoint will have a large sync time, async time >> >>>>>>>> seems ok >> >>>>>>>> 4. Some bad checkpoint, the e2e duration will much bigger than >> >>>>>>>> (sync_time + async_time) >> >>>>>>>> First, answer the last question, the e2e duration is ack_time - >> >>>>>>>> trigger_time, so it always bigger than (sync_time + async_time), >> but we >> >>>>>>>> have a big gap here, this may be problematic. >> >>>>>>>> According to all the information, maybe the problem is some task >> >>>>>>>> start to do checkpoint too late and the sync checkpoint part >> took some time >> >>>>>>>> too long, Could you please share some more information such >> below: >> >>>>>>>> - A Screenshot of summary for one bad checkpoint(we call it A >> here) >> >>>>>>>> - The detailed information of checkpoint A(includes all the >> >>>>>>>> problematic subtasks) >> >>>>>>>> - Jobmanager.log and the taskmanager.log for the problematic task >> >>>>>>>> and a health task >> >>>>>>>> - Share the screenshot of subtasks for the problematic >> >>>>>>>> task(includes the `Bytes received`, `Records received`, `Bytes >> sent`, >> >>>>>>>> `Records sent` column), here wants to compare the problematic >> parallelism >> >>>>>>>> and good parallelism’s information, please also share the >> information is >> >>>>>>>> there has a data skew among the parallelisms, >> >>>>>>>> - could you please share some jstacks of the problematic >> >>>>>>>> parallelism — here wants to check whether the task is too busy >> to handle >> >>>>>>>> the barrier. (flame graph or other things is always welcome here) >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Congxian >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月1日周四 下午8:26写道: >> >>>>>>>> >> >>>>>>>>> Hi Bekir >> >>>>>>>>> >> >>>>>>>>> I'll first comb through all the information here, and try to >> find >> >>>>>>>>> out the reason with you, maybe need you to share some more >> information :) >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> Congxian >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月1日周四 下午5:00写道: >> >>>>>>>>> >> >>>>>>>>>> Hi Fabian, >> >>>>>>>>>> Thanks for sharing this with us, but we’re already on version >> >>>>>>>>>> 1.8.1. >> >>>>>>>>>> >> >>>>>>>>>> What I don’t understand is which mechanism in Flink adds 15 >> >>>>>>>>>> minutes to the checkpoint duration occasionally. Can you maybe >> give us some >> >>>>>>>>>> hints on where to look at? Is there a default timeout of 15 >> minutes defined >> >>>>>>>>>> somewhere in Flink? I couldn’t find one. >> >>>>>>>>>> >> >>>>>>>>>> In our pipeline, most of the checkpoints complete in less than >> a >> >>>>>>>>>> minute and some of them completed in 15 minutes+(less than a >> minute). >> >>>>>>>>>> There’s definitely something which adds 15 minutes. This is >> >>>>>>>>>> happening in one or more subtasks during checkpointing. >> >>>>>>>>>> >> >>>>>>>>>> Please see the screenshot below: >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Bekir >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske < >> fhue...@gmail.com> >> >>>>>>>>>> het volgende geschreven: >> >>>>>>>>>> >> >>>>>>>>>> Hi Bekir, >> >>>>>>>>>> >> >>>>>>>>>> Another user reported checkpointing issues with Flink 1.8.0 >> [1]. >> >>>>>>>>>> These seem to be resolved with Flink 1.8.1. >> >>>>>>>>>> >> >>>>>>>>>> Hope this helps, >> >>>>>>>>>> Fabian >> >>>>>>>>>> >> >>>>>>>>>> [1] >> >>>>>>>>>> >> >>>>>>>>>> >> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E >> >>>>>>>>>> >> >>>>>>>>>> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu < >> >>>>>>>>>> qcx978132...@gmail.com>: >> >>>>>>>>>> >> >>>>>>>>>> Hi Bekir >> >>>>>>>>>> >> >>>>>>>>>> First of all, I think there is something wrong. the state size >> >>>>>>>>>> is almost >> >>>>>>>>>> the same, but the duration is different so much. >> >>>>>>>>>> >> >>>>>>>>>> The checkpoint for RocksDBStatebackend is dump sst files, then >> >>>>>>>>>> copy the >> >>>>>>>>>> needed sst files(if you enable incremental checkpoint, the sst >> >>>>>>>>>> files >> >>>>>>>>>> already on remote will not upload), then complete checkpoint. >> Can >> >>>>>>>>>> you check >> >>>>>>>>>> the network bandwidth usage during checkpoint? >> >>>>>>>>>> >> >>>>>>>>>> Best, >> >>>>>>>>>> Congxian >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年7月16日周二 下午10:45写道: >> >>>>>>>>>> >> >>>>>>>>>> Hi all, >> >>>>>>>>>> We have a flink job with user state, checkpointing to >> >>>>>>>>>> RocksDBBackend >> >>>>>>>>>> which is externally stored in AWS S3. >> >>>>>>>>>> After we have migrated our cluster from 1.6 to 1.8, we see >> >>>>>>>>>> occasionally >> >>>>>>>>>> that some slots do to acknowledge the checkpoints quick enough. >> >>>>>>>>>> As an >> >>>>>>>>>> example: All slots acknowledge between 30-50 seconds except >> only >> >>>>>>>>>> one slot >> >>>>>>>>>> acknowledges in 15 mins. Checkpoint sizes are similar to each >> >>>>>>>>>> other, like >> >>>>>>>>>> 200-400 MB. >> >>>>>>>>>> >> >>>>>>>>>> We did not experience this weird behaviour in Flink 1.6. We >> have >> >>>>>>>>>> 5 min >> >>>>>>>>>> checkpoint interval and this happens sometimes once in an hour >> >>>>>>>>>> sometimes >> >>>>>>>>>> more but not in all the checkpoint requests. Please see the >> >>>>>>>>>> screenshot >> >>>>>>>>>> below. >> >>>>>>>>>> >> >>>>>>>>>> Also another point: For the faulty slots, the duration is >> >>>>>>>>>> consistently 15 >> >>>>>>>>>> mins and some seconds, we couldn’t find out where this 15 mins >> >>>>>>>>>> response >> >>>>>>>>>> time comes from. And each time it is a different task manager, >> >>>>>>>>>> not always >> >>>>>>>>>> the same one. >> >>>>>>>>>> >> >>>>>>>>>> Do you guys aware of any other users having similar issues with >> >>>>>>>>>> the new >> >>>>>>>>>> version and also a suggested bug fix or solution? >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Thanks in advance, >> >>>>>>>>>> Bekir Oguz >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>> >> >> >> >> -- >> >> -- Bekir Oguz >> >> >> > >> > -- -- Bekir Oguz