>> After the savepoint state has been written, the sink might start new .in-progress files. These files are not part of the savepoint but renamed to .pending in close(). >> On restore all pending files that are part of the savepoint are moved into final state (and possibly truncated). See handlePendingInProgressFiles() method. >> Pending files that are not part of the savepoint (because they were created later between taking the savepoint and shutting the job down) are not touched and remain >> as .pending files.
>> These should be the .pending files that you observe. Since they contain data that is not part of the savepoint, it should be save to delete them. >> If you keep them, you will have at-least-once output. Is the above also possible with in-progress file ? I had a situation where we see such a dangling file through a restart on error. On Wed, Feb 21, 2018 at 8:52 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Thank you Fabian, > > What is more important ( and I think you might have addressed it in > your post so sorry for being a little obtuse ) is that deleting them does > not violate "at-least-once" delivery. And if that is a definite than we > are fine with it, though we will test it further. > > Thanks and Regards. > > > > On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vishal, hi Mu, >> >> After the savepoint state has been written, the sink might start new >> .in-progress files. These files are not part of the savepoint but renamed >> to .pending in close(). >> On restore all pending files that are part of the savepoint are moved >> into final state (and possibly truncated). See >> handlePendingInProgressFiles() method. >> Pending files that are not part of the savepoint (because they were >> created later between taking the savepoint and shutting the job down) are >> not touched and remain as .pending files. >> >> These should be the .pending files that you observe. Since they contain >> data that is not part of the savepoint, it should be save to delete them. >> If you keep them, you will have at-least-once output. >> >> Best, Fabian >> >> >> 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu....@gmail.com>: >> >>> Hi Aljoscha, >>> >>> Thanks for confirming that fact that Flink doesn't clean up pending >>> files. >>> Is that safe to clean(remove) all the pending files after cancel(w/ or >>> w/o savepointing) or failover? >>> If we do that, will we lose some data? >>> >>> Thanks! >>> >>> Best, >>> Mu >>> >>> >>> >>> >>> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Sorry, but just wanted to confirm that the assertion "at-least-once" >>>> delivery true if there is a dangling pending file ? >>>> >>>> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> That is fine, till flink assure at-least-once semantics ? >>>>> >>>>> If the contents of a .pending file, through the turbulence ( restarts >>>>> etc ) are assured to be in another file than anything starting with "_" >>>>> underscore will by default ignored by hadoop ( hive or MR etc ). >>>>> >>>>> >>>>> >>>>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek < >>>>> aljos...@apache.org> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Sorry for the confusion. The framework (Flink) does currently not do >>>>>> any cleanup of pending files, yes. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> >>>>>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> >> You should only have these dangling pending files after a >>>>>> failure-recovery cycle, as you noticed. My suggestion would be to >>>>>> periodically clean up older pending files. >>>>>> >>>>>> A little confused. Is that what the framework should do, or us as >>>>>> part of some cleanup job ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek < >>>>>> aljos...@apache.org> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> The BucketingSink does not clean up pending files on purpose. In a >>>>>>> distributed setting, and especially with rescaling of Flink operators, >>>>>>> it >>>>>>> is sufficiently hard to figure out which of the pending files you >>>>>>> actually >>>>>>> can delete and which of them you have to leave because they will get >>>>>>> moved >>>>>>> to "final" as part of recovering from a checkpoint on some other >>>>>>> parallel >>>>>>> instance of the sink. >>>>>>> >>>>>>> You should only have these dangling pending files after a >>>>>>> failure-recovery cycle, as you noticed. My suggestion would be to >>>>>>> periodically clean up older pending files. >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> >>>>>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> what pending files should indeed get eventually finalized. This >>>>>>> happens on a checkpoint complete notification. Thus, what you report >>>>>>> seems >>>>>>> not right. Maybe Aljoscha can shed a bit more light into the problem. >>>>>>> >>>>>>> In order to further debug the problem, it would be really helpful to >>>>>>> get access to DEBUG log files of a TM which runs the BucketingSink. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Vishal, >>>>>>>> >>>>>>>> I have the same concern about save pointing in BucketingSink. >>>>>>>> As for your question, I think before the pending files get cleared >>>>>>>> in handleRestoredBucketState . >>>>>>>> They are finalized in notifyCheckpointComplete >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628 >>>>>>>> >>>>>>>> I'm looking into this part of the source code now, since we are >>>>>>>> experiencing some unclosed files after check pointing. >>>>>>>> It would be great if you can share more if you find something new >>>>>>>> about your problem, which might help with our problem. >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Mu >>>>>>>> >>>>>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid >>>>>>>>> -length >>>>>>>>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >>>>>>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid >>>>>>>>> -length >>>>>>>>> >>>>>>>>> >>>>>>>>> This is strange, we had a few retries b'coz of an OOM on one of >>>>>>>>> the TMs and we see this situation. 2 files ( on either sides ) that >>>>>>>>> were >>>>>>>>> dealt with fine but a dangling .pending file. I am sure this is not >>>>>>>>> what is >>>>>>>>> meant to be. We I think have an edge condition and looking at the >>>>>>>>> code it >>>>>>>>> is not obvious. May be some one who wrote the code can shed some >>>>>>>>> light as >>>>>>>>> to how can this happen. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> without --allowNonRestoredState, on a suspend/resume we do see >>>>>>>>>> the length file along with the finalized file ( finalized during >>>>>>>>>> resume ) >>>>>>>>>> >>>>>>>>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>>>>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>>>>>>>>> >>>>>>>>>> that does makes much more sense. >>>>>>>>>> >>>>>>>>>> I guess we should document --allowNonRestoredState better ? It >>>>>>>>>> seems it actually drops state ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> This is 1.4 BTW. I am not sure that I am reading this correctly >>>>>>>>>>> but the lifecycle of cancel/resume is 2 steps >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 1. Cancel job with SP >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> closeCurrentPartFile >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>>>>>>>>> >>>>>>>>>>> is called from close() >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> and that moves files to pending state. That I would presume is >>>>>>>>>>> called when one does a cancel. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2. The restore on resume >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>>>>>>>>> >>>>>>>>>>> calls >>>>>>>>>>> >>>>>>>>>>> handleRestoredBucketState >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>>>>>>>>> >>>>>>>>>>> clears the pending files from state without finalizing them? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> That does not seem to be right. I must be reading the code >>>>>>>>>>> totally wrong ? >>>>>>>>>>> >>>>>>>>>>> I am not sure also whether --allowNonRestoredState is skipping >>>>>>>>>>> getting the state . At least https://ci.apache.org/pr >>>>>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is >>>>>>>>>>> not exactly clear what it does if we add an operator ( GDF I think >>>>>>>>>>> will add >>>>>>>>>>> a new operator in the DAG without state even if stateful, in my >>>>>>>>>>> case the >>>>>>>>>>> Map operator is not even stateful ) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks and please bear with me if this is all something pretty >>>>>>>>>>> simple. >>>>>>>>>>> >>>>>>>>>>> Vishal >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> What should be the behavior of BucketingSink vis a vis state ( >>>>>>>>>>>> pending , inprogess and finalization ) when we suspend and resume ? >>>>>>>>>>>> >>>>>>>>>>>> So I did this >>>>>>>>>>>> >>>>>>>>>>>> * I had a pipe writing to hdfs suspend and resume using >>>>>>>>>>>> --allowNonRestoredState as in I had added a harmless >>>>>>>>>>>> MapOperator ( stateless ). >>>>>>>>>>>> >>>>>>>>>>>> * I see that a file on hdfs, the file being written to ( before >>>>>>>>>>>> the cancel with save point ) go into a pending state >>>>>>>>>>>> _part-0-21.pending >>>>>>>>>>>> >>>>>>>>>>>> * I see a new file being written to in the resumed pipe >>>>>>>>>>>> _part-0-22.in-progress. >>>>>>>>>>>> >>>>>>>>>>>> What I do not see is the file in _part-0-21.pending being >>>>>>>>>>>> finalized ( as in renamed to a just part-0-21. I would have >>>>>>>>>>>> assumed that >>>>>>>>>>>> would be the case in this controlled suspend/resume circumstance. >>>>>>>>>>>> Further >>>>>>>>>>>> it is a rename and hdfs mv is not an expensive operation. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Am I understanding the process correct and it yes any pointers ? >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> Vishal >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >