Thanks for the quick reply. I am confused. If this was a more full featured BucketingSink ,I would imagine that based on shouldRollOnEvent and shouldRollOnEvent, an in progress file could go into pending phase and on checkpoint the pending part file would be finalized. For exactly once any files ( in progress file ) will have a length of the file snapshotted to the checkpoint and used to truncate the file ( if supported ) or dropped as a part-length file ( if truncate not supported ) if a resume from a checkpoint was to happen, to indicate what part of the the finalized file ( finalized when resumed ) was valid . and I had always assumed ( and there is no doc otherwise ) that shouldRollOnCheckpoint would be similar to the other 2 apart from the fact it does the roll and finalize step in a single step on a checkpoint.
Am I better off using BucketingSink ? When to use BucketingSink and when to use RollingSink is not clear at all, even though at the surface it sure looks RollingSink is a better version of .BucketingSink ( or not ) Regards. On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> wrote: > I think the only rolling policy that can be used is > CheckpointRollingPolicy to ensure exactly once. > > Tim > > On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com > wrote: > >> Can StreamingFileSink be used instead of >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, >> even though it looks it could. >> >> >> This code for example >> >> >> StreamingFileSink >> .forRowFormat(new Path(PATH), >> new SimpleStringEncoder<KafkaRecord>()) >> .withBucketAssigner(new >> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) >> .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() { >> @Override >> public boolean >> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws >> IOException { >> return false; >> } >> >> @Override >> public boolean >> shouldRollOnEvent(PartFileInfo<String> partFileState, >> >> KafkaRecord element) throws IOException { >> return partFileState.getSize() > >> 1024 * 1024 * 1024l; >> } >> >> @Override >> public boolean >> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long >> currentTime) throws IOException { >> return currentTime - >> partFileState.getLastUpdateTime() > 10 * 60 * 1000l || >> currentTime - >> partFileState.getCreationTime() > 120 * 60 * 1000l; >> } >> } >> ) >> .build(); >> >> >> few things I see and am not sure I follow about the new RollingFileSink vis >> a vis BucketingSink >> >> >> 1. I do not ever see the inprogress file go to the pending state, as in >> renamed as pending, as was the case in Bucketing Sink. I would assume that >> it would be pending and then >> >> finalized on checkpoint for exactly once semantics ? >> >> >> 2. I see dangling inprogress files at the end of the day. I would assume >> that the withBucketCheckInterval set to 1 minute by default, the >> shouldRollOnProcessingTime should kick in ? >> >> 3. The inprogress files are like >> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that >> additional suffix ? >> >> >> >> >> I have the following set up on the env >> >> env.enableCheckpointing(10 * 60000); >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> env.setRestartStrategy(fixedDelayRestart(4, >> org.apache.flink.api.common.time.Time.minutes(1))); >> StateBackend stateBackEnd = new MemoryStateBackend(); >> env.setStateBackend(stateBackEnd); >> >> >> Regards. >> >> >> >> >>