Hi all, I’ve got a Flink job that uses Kinesis as source and S3 files as Sink. The sink rolls at checkpoints and the checkpointing itself is configured as EXACTLY_ONCE. While running, everything looks good and a new bunch of files appear on s3 each minute (checkpoint is each 60s).
The problem happens when I stop the job with savepoint. The job generates a savepoint that contains Kinesis offsets but new files are uploaded to s3 containing records that go beyond the offsets from the savepoint. I doubt that’s the normal behavior because it breaks the exactly once principle. Has anyone met this kind of behavior? That’s how the sink is defined: FileSink.forBulkFormat (new Path (basePath), new ParquetWriterFactory<> (parquetBuilder)) .withBucketAssigner (new DateTimeBucketAssigner<> (dateTimeFormat)) .withOutputFileConfig (OutputFileConfig.builder () .withPartPrefix (String.format ("part-%s", UUID.randomUUID ())) .withPartSuffix (String.format ("%s.parquet", compressionCodecName.getExtension ())) .build ()) .build (); Thanks, Vararu Vadim.