Any one ? On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> You don't have to. Thank you for the input. > > On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> wrote: > >> My apologies for not seeing your use case properly. The constraint on >> rolling policy is only applicable for bulk formats such as Parquet as >> highlighted in the docs. >> >> As for your questions, I'll have to defer to others more familiar with >> it. I mostly just use bulk formats such as avro and parquet. >> >> Tim >> >> >> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santo...@gmail.com >> wrote: >> >>> That said the in the DefaultRollingPolicy it seems the check is on the >>> file size ( mimics the check shouldRollOnEVent()). >>> >>> I guess the question is >>> >>> Is the call to shouldRollOnCheckPoint. done by the checkpointing >>> thread ? >>> >>> Are the calls to the other 2 methods shouldRollOnEVent and >>> shouldRollOnProcessingTIme done on the execution thread as in inlined ? >>> >>> >>> >>> >>> >>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Thanks for the quick reply. >>>> >>>> I am confused. If this was a more full featured BucketingSink ,I would >>>> imagine that based on shouldRollOnEvent and shouldRollOnEvent, an in >>>> progress file could go into pending phase and on checkpoint the pending >>>> part file would be finalized. For exactly once any files ( in progress >>>> file ) will have a length of the file snapshotted to the checkpoint and >>>> used to truncate the file ( if supported ) or dropped as a part-length file >>>> ( if truncate not supported ) if a resume from a checkpoint was to happen, >>>> to indicate what part of the the finalized file ( finalized when resumed ) >>>> was valid . and I had always assumed ( and there is no doc otherwise ) >>>> that shouldRollOnCheckpoint would be similar to the other 2 apart from >>>> the fact it does the roll and finalize step in a single step on a >>>> checkpoint. >>>> >>>> >>>> Am I better off using BucketingSink ? When to use BucketingSink and >>>> when to use RollingSink is not clear at all, even though at the surface it >>>> sure looks RollingSink is a better version of .BucketingSink ( or not ) >>>> >>>> Regards. >>>> >>>> >>>> >>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> >>>> wrote: >>>> >>>>> I think the only rolling policy that can be used is >>>>> CheckpointRollingPolicy to ensure exactly once. >>>>> >>>>> Tim >>>>> >>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi < >>>>> vishal.santo...@gmail.com wrote: >>>>> >>>>>> Can StreamingFileSink be used instead of >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, >>>>>> even though it looks it could. >>>>>> >>>>>> >>>>>> This code for example >>>>>> >>>>>> >>>>>> StreamingFileSink >>>>>> .forRowFormat(new Path(PATH), >>>>>> new SimpleStringEncoder<KafkaRecord>()) >>>>>> .withBucketAssigner(new >>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) >>>>>> .withRollingPolicy(new RollingPolicy<KafkaRecord, >>>>>> String>() { >>>>>> @Override >>>>>> public boolean >>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws >>>>>> IOException { >>>>>> return false; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public boolean >>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState, >>>>>> >>>>>> KafkaRecord element) throws IOException { >>>>>> return >>>>>> partFileState.getSize() > 1024 * 1024 * 1024l; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public boolean >>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long >>>>>> currentTime) throws IOException { >>>>>> return currentTime - >>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l || >>>>>> currentTime - >>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l; >>>>>> } >>>>>> } >>>>>> ) >>>>>> .build(); >>>>>> >>>>>> >>>>>> few things I see and am not sure I follow about the new RollingFileSink >>>>>> vis a vis BucketingSink >>>>>> >>>>>> >>>>>> 1. I do not ever see the inprogress file go to the pending state, as in >>>>>> renamed as pending, as was the case in Bucketing Sink. I would assume >>>>>> that it would be pending and then >>>>>> >>>>>> finalized on checkpoint for exactly once semantics ? >>>>>> >>>>>> >>>>>> 2. I see dangling inprogress files at the end of the day. I would assume >>>>>> that the withBucketCheckInterval set to 1 minute by default, the >>>>>> shouldRollOnProcessingTime should kick in ? >>>>>> >>>>>> 3. The inprogress files are like >>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is >>>>>> that additional suffix ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I have the following set up on the env >>>>>> >>>>>> env.enableCheckpointing(10 * 60000); >>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>>>>> env.setRestartStrategy(fixedDelayRestart(4, >>>>>> org.apache.flink.api.common.time.Time.minutes(1))); >>>>>> StateBackend stateBackEnd = new MemoryStateBackend(); >>>>>> env.setStateBackend(stateBackEnd); >>>>>> >>>>>> >>>>>> Regards. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>