No problem!

On Wed, Feb 6, 2019 at 6:38 PM Kaustubh Rudrawar <kaust...@box.com> wrote:

> Hi Kostas,
>
> Thanks for the response! Yes - I see the commitAfterRecovery being called
> when a Bucket is restored. I confused myself in thinking that
> 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which
> led me to believe that we were only calling commit and not
> commitAfterRecovery.
>
> Thanks for the clarification!
> -Kaustubh
>
> On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>
>> Hi Kaustubh,
>>
>> Your general understanding is correct.
>>
>> In this case though, the sink will call the
>> S3Committer#commitAfterRecovery() method.
>> This method, after failing to commit the MPU, it will check if the file
>> is there and if the length
>> is correct, and if everything is ok (which is the case in your example),
>> then it will
>> continue to normal execution.
>>
>> I hope this helps.
>>
>> Kostas
>>
>> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <kaust...@box.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to understand the exactly once semantics of the
>>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
>>> guarantees exactly once under a very specific failure scenario.
>>>
>>> For simplicity, lets say we will roll the current part file on
>>> checkpoint (and only on checkpoint), the process is as follows:
>>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
>>> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
>>> 2. This takes the current file, and based on our roll policy of rolling
>>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the
>>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
>>> 3. Once the checkpoint successfully completes, the bucket is notified
>>> via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
>>> through all pendingPartsPerCheckpoint and for each of them: recovers the in
>>> progress part (which doesn't exist in this scenario) and then commits the
>>> upload.
>>> 4. The AmazonS3Client is ultimately called to perform the upload and it
>>> retries the attempt up to N times. If it exhausts retries, it will throw an
>>> Exception.
>>> 5. Upon successful commit of the MPU, Bucket clears out its references
>>> to these uploads from its state.
>>>
>>> Given this flow, I'm having trouble understanding how the following
>>> scenario works:
>>>
>>>    - Step 4: The commit on the MPU succeeds,
>>>    - Step 5: Before this step completes, the task crashes. So at this
>>>    point, S3 has successfully completed the MPU but to the client (the
>>>    Flink job), it has not completed.
>>>    - Flink will then recover from the checkpoint we just took and steps
>>>    3 and 4 will be repeated. My understanding is that, since the MPU 
>>> succeeded
>>>    previously, any attempts at re-committing that upload will result in a 
>>> 404
>>>    ('NoSuchUpload'). So Step 4 should throw an exception. Which would then 
>>> get
>>>    retried by the framework and this process repeats itself.
>>>
>>> So how is this case handled?
>>>
>>> Really appreciate the help!
>>> -Kaustubh
>>>
>>>
>>>

Reply via email to