Hi all,

I’ve got a Flink job that uses Kinesis as source and S3 files as Sink. The sink 
rolls at checkpoints and the checkpointing itself is configured as 
EXACTLY_ONCE. While running, everything looks good and a new bunch of files 
appear on s3 each minute (checkpoint is each 60s).

The problem happens when I stop the job with savepoint. The job generates a 
savepoint that contains  Kinesis offsets but new files are uploaded to s3 
containing records that go beyond the offsets from the savepoint.

I doubt that’s the normal behavior because it breaks the exactly once principle.
Has anyone met this kind of behavior?

That’s how the sink is defined:
FileSink.forBulkFormat (new Path (basePath), new ParquetWriterFactory<> 
(parquetBuilder))
            .withBucketAssigner (new DateTimeBucketAssigner<> (dateTimeFormat))
            .withOutputFileConfig (OutputFileConfig.builder ()
                .withPartPrefix (String.format ("part-%s", UUID.randomUUID ()))
                .withPartSuffix (String.format ("%s.parquet", 
compressionCodecName.getExtension ()))
                .build ())
            .build ();

Thanks,
Vararu Vadim.

Reply via email to