No problem! On Wed, Feb 6, 2019 at 6:38 PM Kaustubh Rudrawar <kaust...@box.com> wrote:
> Hi Kostas, > > Thanks for the response! Yes - I see the commitAfterRecovery being called > when a Bucket is restored. I confused myself in thinking that > 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which > led me to believe that we were only calling commit and not > commitAfterRecovery. > > Thanks for the clarification! > -Kaustubh > > On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Hi Kaustubh, >> >> Your general understanding is correct. >> >> In this case though, the sink will call the >> S3Committer#commitAfterRecovery() method. >> This method, after failing to commit the MPU, it will check if the file >> is there and if the length >> is correct, and if everything is ok (which is the case in your example), >> then it will >> continue to normal execution. >> >> I hope this helps. >> >> Kostas >> >> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <kaust...@box.com> >> wrote: >> >>> Hi, >>> >>> I'm trying to understand the exactly once semantics of the >>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it >>> guarantees exactly once under a very specific failure scenario. >>> >>> For simplicity, lets say we will roll the current part file on >>> checkpoint (and only on checkpoint), the process is as follows: >>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately >>> results in 'onReceptionOfCheckpoint' being called on Bucket.java. >>> 2. This takes the current file, and based on our roll policy of rolling >>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the >>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'. >>> 3. Once the checkpoint successfully completes, the bucket is notified >>> via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes >>> through all pendingPartsPerCheckpoint and for each of them: recovers the in >>> progress part (which doesn't exist in this scenario) and then commits the >>> upload. >>> 4. The AmazonS3Client is ultimately called to perform the upload and it >>> retries the attempt up to N times. If it exhausts retries, it will throw an >>> Exception. >>> 5. Upon successful commit of the MPU, Bucket clears out its references >>> to these uploads from its state. >>> >>> Given this flow, I'm having trouble understanding how the following >>> scenario works: >>> >>> - Step 4: The commit on the MPU succeeds, >>> - Step 5: Before this step completes, the task crashes. So at this >>> point, S3 has successfully completed the MPU but to the client (the >>> Flink job), it has not completed. >>> - Flink will then recover from the checkpoint we just took and steps >>> 3 and 4 will be repeated. My understanding is that, since the MPU >>> succeeded >>> previously, any attempts at re-committing that upload will result in a >>> 404 >>> ('NoSuchUpload'). So Step 4 should throw an exception. Which would then >>> get >>> retried by the framework and this process repeats itself. >>> >>> So how is this case handled? >>> >>> Really appreciate the help! >>> -Kaustubh >>> >>> >>>