>> You should only have these dangling pending files after a
failure-recovery cycle, as you noticed. My suggestion would be to
periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of
some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> The BucketingSink does not clean up pending files on purpose. In a
> distributed setting, and especially with rescaling of Flink operators, it
> is sufficiently hard to figure out which of the pending files you actually
> can delete and which of them you have to leave because they will get moved
> to "final" as part of recovering from a checkpoint on some other parallel
> instance of the sink.
>
> You should only have these dangling pending files after a failure-recovery
> cycle, as you noticed. My suggestion would be to periodically clean up
> older pending files.
>
> Best,
> Aljoscha
>
>
> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Vishal,
>
> what pending files should indeed get eventually finalized. This happens on
> a checkpoint complete notification. Thus, what you report seems not right.
> Maybe Aljoscha can shed a bit more light into the problem.
>
> In order to further debug the problem, it would be really helpful to get
> access to DEBUG log files of a TM which runs the BucketingSink.
>
> Cheers,
> Till
>
> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu....@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> I have the same concern about save pointing in BucketingSink.
>> As for your question, I think before the pending files get cleared in
>> handleRestoredBucketState .
>> They are finalized in notifyCheckpointComplete
>>
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-filesystem/src/main/java/org/apache/
>> flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>
>> I'm looking into this part of the source code now, since we are
>> experiencing some unclosed files after check pointing.
>> It would be great if you can share more if you find something new about
>> your problem, which might help with our problem.
>>
>> Best regards,
>> Mu
>>
>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>
>>>
>>> This is strange, we had a few retries b'coz of an OOM on one of the TMs
>>> and we see this situation. 2 files ( on either sides )  that were dealt
>>> with fine but a dangling .pending file. I am sure this is not what is meant
>>> to be.   We I think have an edge condition and looking at the code it is
>>> not obvious. May be some one who wrote the code can shed some light as to
>>> how can this happen.
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> without --allowNonRestoredState, on a suspend/resume we do see the
>>>> length file along with the finalized file ( finalized during resume )
>>>>
>>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>>
>>>> that does makes much more sense.
>>>>
>>>> I guess we should document --allowNonRestoredState better ? It seems
>>>> it actually drops state ?
>>>>
>>>>
>>>>
>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> This is 1.4 BTW.  I am not sure that I am reading this correctly but
>>>>> the lifecycle of cancel/resume is 2 steps
>>>>>
>>>>>
>>>>>
>>>>> 1. Cancel job with SP
>>>>>
>>>>>
>>>>> closeCurrentPartFile
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>>
>>>>> is called from close()
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>>
>>>>>
>>>>> and that moves files to pending state.  That I would presume is called
>>>>> when one does a cancel.
>>>>>
>>>>>
>>>>>
>>>>> 2. The restore on resume
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>>
>>>>> calls
>>>>>
>>>>> handleRestoredBucketState
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>>
>>>>> clears the pending files from state without finalizing them?
>>>>>
>>>>>
>>>>>
>>>>> That does not seem to be right. I must be reading the code totally
>>>>> wrong ?
>>>>>
>>>>> I am not sure also whether --allowNonRestoredState is skipping
>>>>> getting the state . At least https://ci.apache.org/pr
>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not
>>>>> exactly clear what it does if we add an operator ( GDF I think will add a
>>>>> new operator in the DAG without state even if stateful, in my case the Map
>>>>> operator is not even stateful )
>>>>>
>>>>>
>>>>> Thanks and please bear with me if this is all something pretty simple.
>>>>>
>>>>> Vishal
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> What should be the behavior of BucketingSink vis a vis state (
>>>>>> pending , inprogess and finalization ) when we suspend and resume ?
>>>>>>
>>>>>> So I did this
>>>>>>
>>>>>> * I had a pipe writing to hdfs suspend and resume using
>>>>>> --allowNonRestoredState as in I had added a harmless MapOperator (
>>>>>> stateless ).
>>>>>>
>>>>>> * I see that a file on hdfs, the file being written to ( before the
>>>>>> cancel with save point )  go into a pending state
>>>>>> _part-0-21.pending
>>>>>>
>>>>>> * I see a new file being written to in the resumed pipe
>>>>>> _part-0-22.in-progress.
>>>>>>
>>>>>> What  I do not see is the file in  _part-0-21.pending being
>>>>>> finalized ( as in renamed to a just part-0-21. I would have assumed that
>>>>>> would be the case in this controlled suspend/resume circumstance. Further
>>>>>> it is a rename and hdfs mv is not an expensive operation.
>>>>>>
>>>>>>
>>>>>> Am I understanding the process correct and it yes any pointers ?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Vishal
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to