Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> You don't have to. Thank you for the input.
>
> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> wrote:
>
>> My apologies for not seeing your use case properly.   The constraint on
>> rolling policy is only applicable for bulk formats such as Parquet as
>> highlighted in the docs.
>>
>> As for your questions, I'll have to defer to others more familiar with
>> it.   I mostly just use bulk formats such as avro and parquet.
>>
>> Tim
>>
>>
>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santo...@gmail.com
>> wrote:
>>
>>> That said the in the DefaultRollingPolicy it seems the check is on the
>>> file size ( mimics the check shouldRollOnEVent()).
>>>
>>> I guess the question is
>>>
>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>> thread ?
>>>
>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Thanks for the quick reply.
>>>>
>>>> I am confused. If this was a more full featured BucketingSink ,I would
>>>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>>>> progress file could go into pending phase and on checkpoint the pending
>>>> part file would be  finalized. For exactly once any files ( in progress
>>>> file ) will have a length of the file  snapshotted to the checkpoint  and
>>>> used to truncate the file ( if supported ) or dropped as a part-length file
>>>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>>>> to indicate what part of the the finalized file ( finalized when resumed )
>>>> was valid . and  I had always assumed ( and there is no doc otherwise )
>>>> that shouldRollOnCheckpoint would be similar to the other 2 apart from
>>>> the fact it does the roll and finalize step in a single step on a
>>>> checkpoint.
>>>>
>>>>
>>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>>> when to use RollingSink is not clear at all, even though at the surface it
>>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think the only rolling policy that can be used is
>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>
>>>>> Tim
>>>>>
>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com wrote:
>>>>>
>>>>>> Can StreamingFileSink be used instead of 
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>>>>  even though it looks it could.
>>>>>>
>>>>>>
>>>>>> This code for example
>>>>>>
>>>>>>
>>>>>>         StreamingFileSink
>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>                 .withBucketAssigner(new 
>>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, 
>>>>>> String>() {
>>>>>>                                        @Override
>>>>>>                                        public boolean 
>>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws 
>>>>>> IOException {
>>>>>>                                            return false;
>>>>>>                                        }
>>>>>>
>>>>>>                                        @Override
>>>>>>                                        public boolean 
>>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>                                                                         
>>>>>> KafkaRecord element) throws IOException {
>>>>>>                                            return 
>>>>>> partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>                                        }
>>>>>>
>>>>>>                                        @Override
>>>>>>                                        public boolean 
>>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
>>>>>> currentTime) throws IOException {
>>>>>>                                            return currentTime - 
>>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>                                                    currentTime - 
>>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>                                        }
>>>>>>                                    }
>>>>>>                 )
>>>>>>                 .build();
>>>>>>
>>>>>>
>>>>>> few things I see and am not sure I follow about the new RollingFileSink  
>>>>>> vis a vis BucketingSink
>>>>>>
>>>>>>
>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in 
>>>>>> renamed as pending, as was the case in Bucketing Sink.  I would assume 
>>>>>> that it would be pending and then
>>>>>>
>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>
>>>>>>
>>>>>> 2. I see dangling inprogress files at the end of the day. I would assume 
>>>>>> that the withBucketCheckInterval set to 1 minute by default, the 
>>>>>> shouldRollOnProcessingTime should kick in ?
>>>>>>
>>>>>>  3. The inprogress files are  like 
>>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is 
>>>>>> that additional suffix ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have the following set up on the env
>>>>>>
>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>> env.setRestartStrategy(fixedDelayRestart(4, 
>>>>>> org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to