Hi Kostas, Thanks for the response! Yes - I see the commitAfterRecovery being called when a Bucket is restored. I confused myself in thinking that 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which led me to believe that we were only calling commit and not commitAfterRecovery.
Thanks for the clarification! -Kaustubh On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Kaustubh, > > Your general understanding is correct. > > In this case though, the sink will call the > S3Committer#commitAfterRecovery() method. > This method, after failing to commit the MPU, it will check if the file is > there and if the length > is correct, and if everything is ok (which is the case in your example), > then it will > continue to normal execution. > > I hope this helps. > > Kostas > > On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <kaust...@box.com> wrote: > >> Hi, >> >> I'm trying to understand the exactly once semantics of the >> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it >> guarantees exactly once under a very specific failure scenario. >> >> For simplicity, lets say we will roll the current part file on checkpoint >> (and only on checkpoint), the process is as follows: >> 1. Framework tells the sink to prepare for a checkpoint. This ultimately >> results in 'onReceptionOfCheckpoint' being called on Bucket.java. >> 2. This takes the current file, and based on our roll policy of rolling >> on checkpoint, it closes and uploads it to S3 as part of a MPU and the >> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'. >> 3. Once the checkpoint successfully completes, the bucket is notified via >> 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes >> through all pendingPartsPerCheckpoint and for each of them: recovers the in >> progress part (which doesn't exist in this scenario) and then commits the >> upload. >> 4. The AmazonS3Client is ultimately called to perform the upload and it >> retries the attempt up to N times. If it exhausts retries, it will throw an >> Exception. >> 5. Upon successful commit of the MPU, Bucket clears out its references to >> these uploads from its state. >> >> Given this flow, I'm having trouble understanding how the following >> scenario works: >> >> - Step 4: The commit on the MPU succeeds, >> - Step 5: Before this step completes, the task crashes. So at this >> point, S3 has successfully completed the MPU but to the client (the >> Flink job), it has not completed. >> - Flink will then recover from the checkpoint we just took and steps >> 3 and 4 will be repeated. My understanding is that, since the MPU >> succeeded >> previously, any attempts at re-committing that upload will result in a 404 >> ('NoSuchUpload'). So Step 4 should throw an exception. Which would then >> get >> retried by the framework and this process repeats itself. >> >> So how is this case handled? >> >> Really appreciate the help! >> -Kaustubh >> >> >>