Hi Bekir, If it is the storage place for timers, for RocksDBStateBackend, timers can be stored in Heap or RocksDB[1][2] [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options
Best, Congxian Bekir Oguz <bekir.o...@persgroep.net> 于2019年9月4日周三 下午11:38写道: > Hi Stephan, > sorry for late response. > We indeed use timers inside a KeyedProcessFunction but the triggers of the > timers are kinda evenly distributed, so not causing a firing storm. > We have a custom ttl logic which is used by the deletion timer to decide > whether delete a record from inmemory state or not. > Can you maybe give some links to those changes in the RocksDB? > > Thanks in advance, > Bekir Oguz > > On Fri, 30 Aug 2019 at 14:56, Stephan Ewen <se...@apache.org> wrote: > >> Hi all! >> >> A thought would be that this has something to do with timers. Does the >> task with that behavior use timers (windows, or process function)? >> >> If that is the case, some theories to check: >> - Could it be a timer firing storm coinciding with a checkpoint? >> Currently, that storm synchronously fires, checkpoints cannot preempt that, >> which should change in 1.10 with the new mailbox model. >> - Could the timer-async checkpointing changes have something to do with >> that? Does some of the usually small "preparation work" (happening >> synchronously) lead to an unwanted effect? >> - Are you using TTL for state in that operator? >> - There were some changes made to support timers in RocksDB recently. >> Could that have something to do with it? >> >> Best, >> Stephan >> >> >> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu <qcx978132...@gmail.com> >> wrote: >> >>> CC flink dev mail list >>> Update for those who may be interested in this issue, we'are still >>> diagnosing this problem currently. >>> >>> Best, >>> Congxian >>> >>> >>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月29日周四 下午8:58写道: >>> >>> > Hi Bekir >>> > >>> > Currently, from what we have diagnosed, there is some task complete its >>> > checkpoint too late (maybe 15 mins), but we checked the kafka broker >>> log >>> > and did not find any interesting things there. could we run another >>> job, >>> > that did not commit offset to kafka, this wants to check if it is the >>> > "commit offset to kafka" step consumes too much time. >>> > >>> > Best, >>> > Congxian >>> > >>> > >>> > Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月28日周三 下午4:19写道: >>> > >>> >> Hi Congxian, >>> >> sorry for the late reply, but no progress on this issue yet. I checked >>> >> also the kafka broker logs, found nothing interesting there. >>> >> And we still have 15 min duration checkpoints quite often. Any more >>> ideas >>> >> on where to look at? >>> >> >>> >> Regards, >>> >> Bekir >>> >> >>> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu <qcx978132...@gmail.com> >>> >> wrote: >>> >> >>> >>> Hi Bekir >>> >>> >>> >>> Do you come back to work now, does there any more findings of this >>> >>> problem? >>> >>> >>> >>> Best, >>> >>> Congxian >>> >>> >>> >>> >>> >>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月13日周二 下午4:39写道: >>> >>> >>> >>>> Hi Congxian, >>> >>>> Thanks for following up this issue. It is still unresolved and I am >>> on >>> >>>> vacation at the moment. Hopefully my collegues Niels and Vlad can >>> spare >>> >>>> some time to look into this. >>> >>>> >>> >>>> @Niels, @Vlad: do you guys also think that this issue might be Kafka >>> >>>> related? We could also check kafka broker logs at the time of long >>> >>>> checkpointing. >>> >>>> >>> >>>> Thanks, >>> >>>> Bekir >>> >>>> >>> >>>> Verstuurd vanaf mijn iPhone >>> >>>> >>> >>>> Op 12 aug. 2019 om 15:18 heeft Congxian Qiu <qcx978132...@gmail.com >>> > >>> >>>> het volgende geschreven: >>> >>>> >>> >>>> Hi Bekir >>> >>>> >>> >>>> Is there any progress about this problem? >>> >>>> >>> >>>> Best, >>> >>>> Congxian >>> >>>> >>> >>>> >>> >>>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月8日周四 下午11:17写道: >>> >>>> >>> >>>>> hi Bekir >>> >>>>> Thanks for the information. >>> >>>>> >>> >>>>> - Source's checkpoint was triggered by RPC calls, so it has the >>> >>>>> "Trigger checkpoint xxx" log, >>> >>>>> - other task's checkpoint was triggered after received all the >>> barrier >>> >>>>> of upstreams, it didn't log the "Trigger checkpoint XXX" :( >>> >>>>> >>> >>>>> Your diagnose makes sense to me, we need to check the Kafka log. >>> >>>>> I also find out that we always have a log like >>> >>>>> "org.apache.kafka.clients.consumer.internals.AbstractCoordinator >>> Marking >>> >>>>> the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) >>> dead >>> >>>>> for group userprofileaggregator >>> >>>>> 2019-08-06 13:58:49,872 DEBUG >>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - >>> Notifica", >>> >>>>> >>> >>>>> I checked the doc of kafka[1], only find that the default of ` >>> >>>>> transaction.max.timeout.ms` is 15 min >>> >>>>> >>> >>>>> Please let me know there you have any finds. thanks >>> >>>>> >>> >>>>> PS: maybe you can also checkpoint the log for task >>> >>>>> `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message >>> late also. >>> >>>>> >>> >>>>> [1] https://kafka.apache.org/documentation/ >>> >>>>> Best, >>> >>>>> Congxian >>> >>>>> >>> >>>>> >>> >>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月7日周三 下午6:48写道: >>> >>>>> >>> >>>>>> Hi Congxian, >>> >>>>>> Thanks for checking the logs. What I see from the logs is: >>> >>>>>> >>> >>>>>> - For the tasks like "Source: >>> >>>>>> profileservice-snowplow-clean-events_kafka_source -> Filter” {17, >>> 27, 31, >>> >>>>>> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also >>> ‘Confirm >>> >>>>>> checkpoint’ log lines, with 15 mins delay in between. >>> >>>>>> - For the tasks like “KeyedProcess -> (Sink: >>> >>>>>> profileservice-userprofiles_kafka_sink, Sink: >>> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink: >>> >>>>>> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO >>> NOT have >>> >>>>>> the “Triggering checkpoint” log, but only the ‘Confirm >>> checkpoint’ lines. >>> >>>>>> >>> >>>>>> And as a final point, we ALWAYS have Kafka AbstractCoordinator >>> logs >>> >>>>>> about lost connection to Kafka at the same time we have the >>> checkpoints >>> >>>>>> confirmed. This 15 minutes delay might be because of some timeout >>> at the >>> >>>>>> Kafka client (maybe 15 mins timeout), and then marking kafka >>> coordinator >>> >>>>>> dead, and then discovering kafka coordinator again. >>> >>>>>> >>> >>>>>> If the kafka connection is IDLE during 15 mins, Flink cannot >>> confirm >>> >>>>>> the checkpoints, cannot send the async offset commit request to >>> Kafka. This >>> >>>>>> could be the root cause of the problem. Please see the attached >>> logs >>> >>>>>> filtered on the Kafka AbstractCoordinator. Every time we have a >>> 15 minutes >>> >>>>>> checkpoint, we have this kafka issue. (Happened today at 9:14 and >>> 9:52) >>> >>>>>> >>> >>>>>> >>> >>>>>> I will enable Kafka DEBUG logging to see more and let you know >>> about >>> >>>>>> the findings. >>> >>>>>> >>> >>>>>> Thanks a lot for your support, >>> >>>>>> Bekir Oguz >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> Op 7 aug. 2019, om 12:06 heeft Congxian Qiu < >>> qcx978132...@gmail.com> >>> >>>>>> het volgende geschreven: >>> >>>>>> >>> >>>>>> Hi >>> >>>>>> >>> >>>>>> Received all the files, as a first glance, the program uses at >>> least >>> >>>>>> once checkpoint mode, from the tm log, maybe we need to check >>> checkpoint of >>> >>>>>> this operator "Invoking async call Checkpoint Confirmation for >>> KeyedProcess >>> >>>>>> -> (Sink: profileservice-userprofiles_kafka_sink, Sink: >>> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink: >>> >>>>>> profileservice-profiledeletion_kafka_sink) (5/70) on task >>> KeyedProcess -> >>> >>>>>> (Sink: profileservice-userprofiles_kafka_sink, Sink: >>> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink: >>> >>>>>> profileservice-profiledeletion_kafka_sink) (5/70)", >>> >>>>>> >>> >>>>>> Seems it took too long to complete the checkpoint (maybe something >>> >>>>>> about itself, or maybe something of Kafka). I'll go through the >>> logs >>> >>>>>> carefully today or tomorrow again. >>> >>>>>> >>> >>>>>> Best, >>> >>>>>> Congxian >>> >>>>>> >>> >>>>>> >>> >>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月6日周二 下午10:38写道: >>> >>>>>> >>> >>>>>>> Ok, I am removing apache dev group from CC. >>> >>>>>>> Only sending to you and my colleagues. >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> Thanks, >>> >>>>>>> Bekir >>> >>>>>>> >>> >>>>>>> Op 6 aug. 2019, om 17:33 heeft Bekir Oguz < >>> bekir.o...@persgroep.net> >>> >>>>>>> het volgende geschreven: >>> >>>>>>> >>> >>>>>>> Hi Congxian, >>> >>>>>>> Previous email didn’t work out due to size limits. >>> >>>>>>> I am sending you only job manager log zipped, and will send other >>> >>>>>>> info in separate email. >>> >>>>>>> <jobmanager_sb77v.log.zip> >>> >>>>>>> Regards, >>> >>>>>>> Bekir >>> >>>>>>> >>> >>>>>>> Op 2 aug. 2019, om 16:37 heeft Congxian Qiu < >>> qcx978132...@gmail.com> >>> >>>>>>> het volgende geschreven: >>> >>>>>>> >>> >>>>>>> Hi Bekir >>> >>>>>>> >>> >>>>>>> Cloud you please also share the below information: >>> >>>>>>> - jobmanager.log >>> >>>>>>> - taskmanager.log(with debug info enabled) for the problematic >>> >>>>>>> subtask. >>> >>>>>>> - the DAG of your program (if can provide the skeleton program is >>> >>>>>>> better -- can send to me privately) >>> >>>>>>> >>> >>>>>>> For the subIndex, maybe we can use the deploy log message in >>> >>>>>>> jobmanager log to identify which subtask we want. For example in >>> JM log, >>> >>>>>>> we'll have something like "2019-08-02 11:38:47,291 INFO >>> >>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - >>> Deploying Source: >>> >>>>>>> Custom Source (2/2) (attempt #0) to >>> >>>>>>> container_e62_1551952890130_2071_01_000002 @ aa.bb.cc.dd.ee >>> >>>>>>> (dataPort=39488)" then we know "Custum Source (2/2)" was deplyed >>> to " >>> >>>>>>> aa.bb.cc.dd.ee" with port 39488. Sadly, there maybe still more >>> than >>> >>>>>>> one subtasks in one contain :( >>> >>>>>>> >>> >>>>>>> Best, >>> >>>>>>> Congxian >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月2日周五 下午4:22写道: >>> >>>>>>> >>> >>>>>>>> Forgot to add the checkpoint details after it was complete. >>> This is >>> >>>>>>>> for that long running checkpoint with id 95632. >>> >>>>>>>> >>> >>>>>>>> <PastedGraphic-5.png> >>> >>>>>>>> >>> >>>>>>>> Op 2 aug. 2019, om 11:18 heeft Bekir Oguz < >>> bekir.o...@persgroep.net> >>> >>>>>>>> het volgende geschreven: >>> >>>>>>>> >>> >>>>>>>> Hi Congxian, >>> >>>>>>>> I was able to fetch the logs of the task manager (attached) and >>> the >>> >>>>>>>> screenshots of the latest long checkpoint. I will get the logs >>> of the job >>> >>>>>>>> manager for the next long running checkpoint. And also I will >>> try to get a >>> >>>>>>>> jstack during the long running checkpoint. >>> >>>>>>>> >>> >>>>>>>> Note: Since at the Subtasks tab we do not have the subtask >>> numbers, >>> >>>>>>>> and at the Details tab of the checkpoint, we have the subtask >>> numbers but >>> >>>>>>>> not the task manager hosts, it is difficult to match those. >>> We’re assuming >>> >>>>>>>> they have the same order, so seeing that 3rd subtask is >>> failing, I am >>> >>>>>>>> getting the 3rd line at the Subtasks tab which leads to the >>> task manager >>> >>>>>>>> host flink-taskmanager-84ccd5bddf-2cbxn. ***It would be a great >>> feature if >>> >>>>>>>> you guys also include the subtask-id’s to the Subtasks view.*** >>> >>>>>>>> >>> >>>>>>>> Note: timestamps in the task manager log are in UTC and I am at >>> the >>> >>>>>>>> moment at zone UTC+3, so the time 10:30 at the screenshot >>> matches the time >>> >>>>>>>> 7:30 in the log. >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> Kind regards, >>> >>>>>>>> Bekir >>> >>>>>>>> >>> >>>>>>>> <task_manager.log> >>> >>>>>>>> >>> >>>>>>>> <PastedGraphic-4.png> >>> >>>>>>>> <PastedGraphic-3.png> >>> >>>>>>>> <PastedGraphic-2.png> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> Op 2 aug. 2019, om 07:23 heeft Congxian Qiu < >>> qcx978132...@gmail.com> >>> >>>>>>>> het volgende geschreven: >>> >>>>>>>> >>> >>>>>>>> Hi Bekir >>> >>>>>>>> I’ll first summary the problem here(please correct me if I’m >>> wrong) >>> >>>>>>>> 1. The same program runs on 1.6 never encounter such problems >>> >>>>>>>> 2. Some checkpoints completed too long (15+ min), but other >>> normal >>> >>>>>>>> checkpoints complete less than 1 min >>> >>>>>>>> 3. Some bad checkpoint will have a large sync time, async time >>> >>>>>>>> seems ok >>> >>>>>>>> 4. Some bad checkpoint, the e2e duration will much bigger than >>> >>>>>>>> (sync_time + async_time) >>> >>>>>>>> First, answer the last question, the e2e duration is ack_time - >>> >>>>>>>> trigger_time, so it always bigger than (sync_time + >>> async_time), but we >>> >>>>>>>> have a big gap here, this may be problematic. >>> >>>>>>>> According to all the information, maybe the problem is some task >>> >>>>>>>> start to do checkpoint too late and the sync checkpoint part >>> took some time >>> >>>>>>>> too long, Could you please share some more information such >>> below: >>> >>>>>>>> - A Screenshot of summary for one bad checkpoint(we call it A >>> here) >>> >>>>>>>> - The detailed information of checkpoint A(includes all the >>> >>>>>>>> problematic subtasks) >>> >>>>>>>> - Jobmanager.log and the taskmanager.log for the problematic >>> task >>> >>>>>>>> and a health task >>> >>>>>>>> - Share the screenshot of subtasks for the problematic >>> >>>>>>>> task(includes the `Bytes received`, `Records received`, `Bytes >>> sent`, >>> >>>>>>>> `Records sent` column), here wants to compare the problematic >>> parallelism >>> >>>>>>>> and good parallelism’s information, please also share the >>> information is >>> >>>>>>>> there has a data skew among the parallelisms, >>> >>>>>>>> - could you please share some jstacks of the problematic >>> >>>>>>>> parallelism — here wants to check whether the task is too busy >>> to handle >>> >>>>>>>> the barrier. (flame graph or other things is always welcome >>> here) >>> >>>>>>>> >>> >>>>>>>> Best, >>> >>>>>>>> Congxian >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月1日周四 下午8:26写道: >>> >>>>>>>> >>> >>>>>>>>> Hi Bekir >>> >>>>>>>>> >>> >>>>>>>>> I'll first comb through all the information here, and try to >>> find >>> >>>>>>>>> out the reason with you, maybe need you to share some more >>> information :) >>> >>>>>>>>> >>> >>>>>>>>> Best, >>> >>>>>>>>> Congxian >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月1日周四 下午5:00写道: >>> >>>>>>>>> >>> >>>>>>>>>> Hi Fabian, >>> >>>>>>>>>> Thanks for sharing this with us, but we’re already on version >>> >>>>>>>>>> 1.8.1. >>> >>>>>>>>>> >>> >>>>>>>>>> What I don’t understand is which mechanism in Flink adds 15 >>> >>>>>>>>>> minutes to the checkpoint duration occasionally. Can you >>> maybe give us some >>> >>>>>>>>>> hints on where to look at? Is there a default timeout of 15 >>> minutes defined >>> >>>>>>>>>> somewhere in Flink? I couldn’t find one. >>> >>>>>>>>>> >>> >>>>>>>>>> In our pipeline, most of the checkpoints complete in less >>> than a >>> >>>>>>>>>> minute and some of them completed in 15 minutes+(less than a >>> minute). >>> >>>>>>>>>> There’s definitely something which adds 15 minutes. This is >>> >>>>>>>>>> happening in one or more subtasks during checkpointing. >>> >>>>>>>>>> >>> >>>>>>>>>> Please see the screenshot below: >>> >>>>>>>>>> >>> >>>>>>>>>> Regards, >>> >>>>>>>>>> Bekir >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske < >>> fhue...@gmail.com> >>> >>>>>>>>>> het volgende geschreven: >>> >>>>>>>>>> >>> >>>>>>>>>> Hi Bekir, >>> >>>>>>>>>> >>> >>>>>>>>>> Another user reported checkpointing issues with Flink 1.8.0 >>> [1]. >>> >>>>>>>>>> These seem to be resolved with Flink 1.8.1. >>> >>>>>>>>>> >>> >>>>>>>>>> Hope this helps, >>> >>>>>>>>>> Fabian >>> >>>>>>>>>> >>> >>>>>>>>>> [1] >>> >>>>>>>>>> >>> >>>>>>>>>> >>> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E >>> >>>>>>>>>> >>> >>>>>>>>>> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu < >>> >>>>>>>>>> qcx978132...@gmail.com>: >>> >>>>>>>>>> >>> >>>>>>>>>> Hi Bekir >>> >>>>>>>>>> >>> >>>>>>>>>> First of all, I think there is something wrong. the state >>> size >>> >>>>>>>>>> is almost >>> >>>>>>>>>> the same, but the duration is different so much. >>> >>>>>>>>>> >>> >>>>>>>>>> The checkpoint for RocksDBStatebackend is dump sst files, then >>> >>>>>>>>>> copy the >>> >>>>>>>>>> needed sst files(if you enable incremental checkpoint, the sst >>> >>>>>>>>>> files >>> >>>>>>>>>> already on remote will not upload), then complete checkpoint. >>> Can >>> >>>>>>>>>> you check >>> >>>>>>>>>> the network bandwidth usage during checkpoint? >>> >>>>>>>>>> >>> >>>>>>>>>> Best, >>> >>>>>>>>>> Congxian >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年7月16日周二 >>> 下午10:45写道: >>> >>>>>>>>>> >>> >>>>>>>>>> Hi all, >>> >>>>>>>>>> We have a flink job with user state, checkpointing to >>> >>>>>>>>>> RocksDBBackend >>> >>>>>>>>>> which is externally stored in AWS S3. >>> >>>>>>>>>> After we have migrated our cluster from 1.6 to 1.8, we see >>> >>>>>>>>>> occasionally >>> >>>>>>>>>> that some slots do to acknowledge the checkpoints quick >>> enough. >>> >>>>>>>>>> As an >>> >>>>>>>>>> example: All slots acknowledge between 30-50 seconds except >>> only >>> >>>>>>>>>> one slot >>> >>>>>>>>>> acknowledges in 15 mins. Checkpoint sizes are similar to each >>> >>>>>>>>>> other, like >>> >>>>>>>>>> 200-400 MB. >>> >>>>>>>>>> >>> >>>>>>>>>> We did not experience this weird behaviour in Flink 1.6. We >>> have >>> >>>>>>>>>> 5 min >>> >>>>>>>>>> checkpoint interval and this happens sometimes once in an hour >>> >>>>>>>>>> sometimes >>> >>>>>>>>>> more but not in all the checkpoint requests. Please see the >>> >>>>>>>>>> screenshot >>> >>>>>>>>>> below. >>> >>>>>>>>>> >>> >>>>>>>>>> Also another point: For the faulty slots, the duration is >>> >>>>>>>>>> consistently 15 >>> >>>>>>>>>> mins and some seconds, we couldn’t find out where this 15 mins >>> >>>>>>>>>> response >>> >>>>>>>>>> time comes from. And each time it is a different task manager, >>> >>>>>>>>>> not always >>> >>>>>>>>>> the same one. >>> >>>>>>>>>> >>> >>>>>>>>>> Do you guys aware of any other users having similar issues >>> with >>> >>>>>>>>>> the new >>> >>>>>>>>>> version and also a suggested bug fix or solution? >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> Thanks in advance, >>> >>>>>>>>>> Bekir Oguz >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>> >>> >> >>> >> -- >>> >> -- Bekir Oguz >>> >> >>> > >>> >> > > -- > -- Bekir Oguz >