Sorry, but just wanted to confirm that the assertion "at-least-once" delivery true if there is a dangling pending file ?
On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > That is fine, till flink assure at-least-once semantics ? > > If the contents of a .pending file, through the turbulence ( restarts etc > ) are assured to be in another file than anything starting with "_" > underscore will by default ignored by hadoop ( hive or MR etc ). > > > > On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> Sorry for the confusion. The framework (Flink) does currently not do any >> cleanup of pending files, yes. >> >> Best, >> Aljoscha >> >> >> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> >> >> You should only have these dangling pending files after a >> failure-recovery cycle, as you noticed. My suggestion would be to >> periodically clean up older pending files. >> >> A little confused. Is that what the framework should do, or us as part of >> some cleanup job ? >> >> >> >> >> >> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> >>> The BucketingSink does not clean up pending files on purpose. In a >>> distributed setting, and especially with rescaling of Flink operators, it >>> is sufficiently hard to figure out which of the pending files you actually >>> can delete and which of them you have to leave because they will get moved >>> to "final" as part of recovering from a checkpoint on some other parallel >>> instance of the sink. >>> >>> You should only have these dangling pending files after a >>> failure-recovery cycle, as you noticed. My suggestion would be to >>> periodically clean up older pending files. >>> >>> Best, >>> Aljoscha >>> >>> >>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> wrote: >>> >>> Hi Vishal, >>> >>> what pending files should indeed get eventually finalized. This happens >>> on a checkpoint complete notification. Thus, what you report seems not >>> right. Maybe Aljoscha can shed a bit more light into the problem. >>> >>> In order to further debug the problem, it would be really helpful to get >>> access to DEBUG log files of a TM which runs the BucketingSink. >>> >>> Cheers, >>> Till >>> >>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote: >>> >>>> Hi Vishal, >>>> >>>> I have the same concern about save pointing in BucketingSink. >>>> As for your question, I think before the pending files get cleared in >>>> handleRestoredBucketState . >>>> They are finalized in notifyCheckpointComplete >>>> >>>> https://github.com/apache/flink/blob/master/flink-connectors >>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628 >>>> >>>> I'm looking into this part of the source code now, since we are >>>> experiencing some unclosed files after check pointing. >>>> It would be great if you can share more if you find something new about >>>> your problem, which might help with our problem. >>>> >>>> Best regards, >>>> Mu >>>> >>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 18:48 >>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length >>>>> -rw-r--r-- 3 root hadoop 54053518 2018-02-14 19:15 >>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending >>>>> -rw-r--r-- 3 root hadoop 11 2018-02-14 21:17 >>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length >>>>> >>>>> >>>>> This is strange, we had a few retries b'coz of an OOM on one of the >>>>> TMs and we see this situation. 2 files ( on either sides ) that were >>>>> dealt >>>>> with fine but a dangling .pending file. I am sure this is not what is >>>>> meant >>>>> to be. We I think have an edge condition and looking at the code it is >>>>> not obvious. May be some one who wrote the code can shed some light as to >>>>> how can this happen. >>>>> >>>>> >>>>> >>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> without --allowNonRestoredState, on a suspend/resume we do see the >>>>>> length file along with the finalized file ( finalized during resume ) >>>>>> >>>>>> -rw-r--r-- 3 root hadoop 10 2018-02-09 13:57 >>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length >>>>>> >>>>>> that does makes much more sense. >>>>>> >>>>>> I guess we should document --allowNonRestoredState better ? It seems >>>>>> it actually drops state ? >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> This is 1.4 BTW. I am not sure that I am reading this correctly but >>>>>>> the lifecycle of cancel/resume is 2 steps >>>>>>> >>>>>>> >>>>>>> >>>>>>> 1. Cancel job with SP >>>>>>> >>>>>>> >>>>>>> closeCurrentPartFile >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549 >>>>>>> >>>>>>> is called from close() >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416 >>>>>>> >>>>>>> >>>>>>> and that moves files to pending state. That I would presume is >>>>>>> called when one does a cancel. >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2. The restore on resume >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369 >>>>>>> >>>>>>> calls >>>>>>> >>>>>>> handleRestoredBucketState >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-connectors >>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s >>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704 >>>>>>> >>>>>>> clears the pending files from state without finalizing them? >>>>>>> >>>>>>> >>>>>>> >>>>>>> That does not seem to be right. I must be reading the code totally >>>>>>> wrong ? >>>>>>> >>>>>>> I am not sure also whether --allowNonRestoredState is skipping >>>>>>> getting the state . At least https://ci.apache.org/pr >>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not >>>>>>> exactly clear what it does if we add an operator ( GDF I think will add >>>>>>> a >>>>>>> new operator in the DAG without state even if stateful, in my case the >>>>>>> Map >>>>>>> operator is not even stateful ) >>>>>>> >>>>>>> >>>>>>> Thanks and please bear with me if this is all something pretty >>>>>>> simple. >>>>>>> >>>>>>> Vishal >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> What should be the behavior of BucketingSink vis a vis state ( >>>>>>>> pending , inprogess and finalization ) when we suspend and resume ? >>>>>>>> >>>>>>>> So I did this >>>>>>>> >>>>>>>> * I had a pipe writing to hdfs suspend and resume using >>>>>>>> --allowNonRestoredState as in I had added a harmless MapOperator ( >>>>>>>> stateless ). >>>>>>>> >>>>>>>> * I see that a file on hdfs, the file being written to ( before the >>>>>>>> cancel with save point ) go into a pending state >>>>>>>> _part-0-21.pending >>>>>>>> >>>>>>>> * I see a new file being written to in the resumed pipe >>>>>>>> _part-0-22.in-progress. >>>>>>>> >>>>>>>> What I do not see is the file in _part-0-21.pending being >>>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed >>>>>>>> that >>>>>>>> would be the case in this controlled suspend/resume circumstance. >>>>>>>> Further >>>>>>>> it is a rename and hdfs mv is not an expensive operation. >>>>>>>> >>>>>>>> >>>>>>>> Am I understanding the process correct and it yes any pointers ? >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Vishal >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >> >