Hi My friends: I use FlieSystem in Flink SQL, and I found that my success file was submitted late, probably dozens of minutes late. Here I provide some information: 1.Flink version is 1.11.1. 2.Source DDL create table test ( `timestamp bigint`, event_time as _timestamp(timestamp), WATERMARK FOR event_time AS event_time - INTERVAL'10'MINUTE )... 3.Sink DDL create table sinkTest( xxx dtm VARCHAR, hh VARCHAR ) PARTITIONED BY (dtm, hh) with( 'connector' = 'filesystem', 'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' = '10 min', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.file-suffix' = '.parquet', 'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00' )
4.The interval for task submission checkpoint is 5 minutes, and the checkpoints are all successful. I think that if my task is not delayed, then our success file will be submitted in about 10 minutes every hour, but the fact is that it is submitted very late. Here are some source codes about submitting success file. When the watermark is greater than the current partition time + delay time, I can submit the success file. public List<String> committablePartitions(long checkpointId) { if (!watermarks.containsKey(checkpointId)) { throw new IllegalArgumentException(String.format( "Checkpoint(%d) has not been snapshot. The watermark information is: %s.", checkpointId, watermarks)); } long watermark = watermarks.get(checkpointId); watermarks.headMap(checkpointId, true).clear(); List<String> needCommit = new ArrayList<>(); Iterator<String> iter = pendingPartitions.iterator(); while (iter.hasNext()) { String partition = iter.next(); LocalDateTime partTime = extractor.extract( partitionKeys, extractPartitionValues(new Path(partition))); if (watermark > toMills(partTime) + commitDelay) { needCommit.add(partition); iter.remove(); } } return needCommit; } Best, Forideal