>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files.
A little confused. Is that what the framework should do, or us as part of some cleanup job ? On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > The BucketingSink does not clean up pending files on purpose. In a > distributed setting, and especially with rescaling of Flink operators, it > is sufficiently hard to figure out which of the pending files you actually > can delete and which of them you have to leave because they will get moved > to "final" as part of recovering from a checkpoint on some other parallel > instance of the sink. > > You should only have these dangling pending files after a failure-recovery > cycle, as you noticed. My suggestion would be to periodically clean up > older pending files. > > Best, > Aljoscha > > > On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Vishal, > > what pending files should indeed get eventually finalized. This happens on > a checkpoint complete notification. Thus, what you report seems not right. > Maybe Aljoscha can shed a bit more light into the problem. > > In order to further debug the problem, it would be really helpful to get > access to DEBUG log files of a TM which runs the BucketingSink. > > Cheers, > Till > > On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote: > >> Hi Vishal, >> >> I have the same concern about save pointing in BucketingSink. >> As for your question, I think before the pending files get cleared in >> handleRestoredBucketState . >> They are finalized in notifyCheckpointComplete >> >> https://github.com/apache/flink/blob/master/flink-connectors >> /flink-connector-filesystem/src/main/java/org/apache/ >> flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628 >> >> I'm looking into this part of the source code now, since we are >> experiencing some unclosed files after check pointing. >> It would be great if you can share more if you find something new about >> your problem, which might help with our problem. >> >> Best regards, >> Mu >> >> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >>> >>> >>> This is strange, we had a few retries b'coz of an OOM on one of the TMs >>> and we see this situation. 2 files ( on either sides ) that were dealt >>> with fine but a dangling .pending file. I am sure this is not what is meant >>> to be. We I think have an edge condition and looking at the code it is >>> not obvious. May be some one who wrote the code can shed some light as to >>> how can this happen. >>> >>> >>> >>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> without --allowNonRestoredState, on a suspend/resume we do see the >>>> length file along with the finalized file ( finalized during resume ) >>>> >>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>>> >>>> that does makes much more sense. >>>> >>>> I guess we should document --allowNonRestoredState better ? It seems >>>> it actually drops state ? >>>> >>>> >>>> >>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> This is 1.4 BTW. I am not sure that I am reading this correctly but >>>>> the lifecycle of cancel/resume is 2 steps >>>>> >>>>> >>>>> >>>>> 1. Cancel job with SP >>>>> >>>>> >>>>> closeCurrentPartFile >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>>> >>>>> is called from close() >>>>> >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>>> >>>>> >>>>> and that moves files to pending state. That I would presume is called >>>>> when one does a cancel. >>>>> >>>>> >>>>> >>>>> 2. The restore on resume >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>>> >>>>> calls >>>>> >>>>> handleRestoredBucketState >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>>> >>>>> clears the pending files from state without finalizing them? >>>>> >>>>> >>>>> >>>>> That does not seem to be right. I must be reading the code totally >>>>> wrong ? >>>>> >>>>> I am not sure also whether --allowNonRestoredState is skipping >>>>> getting the state . At least https://ci.apache.org/pr >>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not >>>>> exactly clear what it does if we add an operator ( GDF I think will add a >>>>> new operator in the DAG without state even if stateful, in my case the Map >>>>> operator is not even stateful ) >>>>> >>>>> >>>>> Thanks and please bear with me if this is all something pretty simple. >>>>> >>>>> Vishal >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> What should be the behavior of BucketingSink vis a vis state ( >>>>>> pending , inprogess and finalization ) when we suspend and resume ? >>>>>> >>>>>> So I did this >>>>>> >>>>>> * I had a pipe writing to hdfs suspend and resume using >>>>>> --allowNonRestoredState as in I had added a harmless MapOperator ( >>>>>> stateless ). >>>>>> >>>>>> * I see that a file on hdfs, the file being written to ( before the >>>>>> cancel with save point ) go into a pending state >>>>>> _part-0-21.pending >>>>>> >>>>>> * I see a new file being written to in the resumed pipe >>>>>> _part-0-22.in-progress. >>>>>> >>>>>> What I do not see is the file in _part-0-21.pending being >>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed that >>>>>> would be the case in this controlled suspend/resume circumstance. Further >>>>>> it is a rename and hdfs mv is not an expensive operation. >>>>>> >>>>>> >>>>>> Am I understanding the process correct and it yes any pointers ? >>>>>> >>>>>> Regards, >>>>>> >>>>>> Vishal >>>>>> >>>>> >>>>> >>>> >>> >> > >