Hi Kostas,

Thanks for the response! Yes - I see the commitAfterRecovery being called
when a Bucket is restored. I confused myself in thinking that
'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which
led me to believe that we were only calling commit and not
commitAfterRecovery.

Thanks for the clarification!
-Kaustubh

On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Kaustubh,
>
> Your general understanding is correct.
>
> In this case though, the sink will call the
> S3Committer#commitAfterRecovery() method.
> This method, after failing to commit the MPU, it will check if the file is
> there and if the length
> is correct, and if everything is ok (which is the case in your example),
> then it will
> continue to normal execution.
>
> I hope this helps.
>
> Kostas
>
> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <kaust...@box.com> wrote:
>
>> Hi,
>>
>> I'm trying to understand the exactly once semantics of the
>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
>> guarantees exactly once under a very specific failure scenario.
>>
>> For simplicity, lets say we will roll the current part file on checkpoint
>> (and only on checkpoint), the process is as follows:
>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
>> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
>> 2. This takes the current file, and based on our roll policy of rolling
>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the
>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
>> 3. Once the checkpoint successfully completes, the bucket is notified via
>> 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
>> through all pendingPartsPerCheckpoint and for each of them: recovers the in
>> progress part (which doesn't exist in this scenario) and then commits the
>> upload.
>> 4. The AmazonS3Client is ultimately called to perform the upload and it
>> retries the attempt up to N times. If it exhausts retries, it will throw an
>> Exception.
>> 5. Upon successful commit of the MPU, Bucket clears out its references to
>> these uploads from its state.
>>
>> Given this flow, I'm having trouble understanding how the following
>> scenario works:
>>
>>    - Step 4: The commit on the MPU succeeds,
>>    - Step 5: Before this step completes, the task crashes. So at this
>>    point, S3 has successfully completed the MPU but to the client (the
>>    Flink job), it has not completed.
>>    - Flink will then recover from the checkpoint we just took and steps
>>    3 and 4 will be repeated. My understanding is that, since the MPU 
>> succeeded
>>    previously, any attempts at re-committing that upload will result in a 404
>>    ('NoSuchUpload'). So Step 4 should throw an exception. Which would then 
>> get
>>    retried by the framework and this process repeats itself.
>>
>> So how is this case handled?
>>
>> Really appreciate the help!
>> -Kaustubh
>>
>>
>>

Reply via email to