Hi all! I think it would be time to rethink the Sink API as a whole, like we did with the Source API in FLIP-27. It would be nice to have proper design that handles all this consistently, rather than adding one more hook.
For example: - For batch, you can already use the existing "finalize on master" hook - For streaming, it is tricky to "commit on end-of-stream" reliably (tolerating failures) - write ahead versus direct writing - transactional versus idempotent - temp files and renaming versus recoverable writer ==> All these things have special cases currently, rather than a coherent design. Best, Stephan On Tue, Sep 10, 2019 at 6:40 AM Dian Fu <dian0511...@gmail.com> wrote: > Hi Jingsong, > > Good point! > > 1. If it doesn't matter which task performs the finalize work, then I > think task-0 suggested by Jark is a very good solution. > 2. If it requires the last finished task to perform the finalize work, > then we have to consider other solutions. > WRT fault-tolerant of StreamingRuntimeContext#getGlobalAggregateManager, > AFAIK, there is no built-in support. > 1) Regarding to TM failover, I think it's not a problem. We can use an > accumulator i.e. finish_count and it is increased by 1 when a sub-task is > finished(i.e. close() method is called). > When finish_count == RuntimeContext.getNumberOfParallelSubtasks() > for some sub-task, then we can know that it's the last finished sub-task. > This holds true even in case of TM failover. > 2) Regarding to JM failover, I have no idea how to work around it so > far. Maybe @Jamie Grier who is the author of this feature could share more > thoughts. Not sure if there is already solution/plan to support JM failover > or this feature is not designed for this kind of use case? > > Regards, > Dian > > > 在 2019年9月9日,下午3:08,shimin yang <ysmcl...@gmail.com> 写道: > > > > Hi Jingsong, > > > > Although it would be nice if the accumulators in GlobalAggregateManager > is > > fault-tolerant, we could still take advantage of managed state to > guarantee > > the semantic and use the accumulators to implement distributed barrier or > > lock to solve the distributed access problem. > > > > Best, > > Shimin > > > > JingsongLee <lzljs3620...@aliyun.com.invalid> 于2019年9月9日周一 下午1:33写道: > > > >> Thanks jark and dian: > >> 1.jark's approach: do the work in task-0. Simple way. > >> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager > >> Can do more operation. But these accumulators are not fault-tolerant? > >> > >> Best, > >> Jingsong Lee > >> > >> > >> ------------------------------------------------------------------ > >> From:shimin yang <ysmcl...@gmail.com> > >> Send Time:2019年9月6日(星期五) 15:21 > >> To:dev <dev@flink.apache.org> > >> Subject:Re: [DISCUSS] Support notifyOnMaster for > notifyCheckpointComplete > >> > >> Hi Fu, > >> > >> That'll be nice. > >> > >> Thanks. > >> > >> Best, > >> Shimin > >> > >> Dian Fu <dian0511...@gmail.com> 于2019年9月6日周五 下午3:17写道: > >> > >>> Hi Shimin, > >>> > >>> It can be guaranteed to be an atomic operation. This is ensured by the > >> RPC > >>> framework. You could take a look at RpcEndpoint for more details. > >>> > >>> Regards, > >>> Dian > >>> > >>>> 在 2019年9月6日,下午2:35,shimin yang <ysmcl...@gmail.com> 写道: > >>>> > >>>> Hi Fu, > >>>> > >>>> Thank you for the remind. I think it would work in my case as long as > >>> it's > >>>> an atomic operation. > >>>> > >>>> Dian Fu <dian0511...@gmail.com> 于2019年9月6日周五 下午2:22写道: > >>>> > >>>>> Hi Jingsong, > >>>>> > >>>>> Thanks for bring up this discussion. You can try to look at the > >>>>> GlobalAggregateManager to see if it can meet your requirements. It > can > >>> be > >>>>> got via StreamingRuntimeContext#getGlobalAggregateManager(). > >>>>> > >>>>> Regards, > >>>>> Dian > >>>>> > >>>>>> 在 2019年9月6日,下午1:39,shimin yang <ysmcl...@gmail.com> 写道: > >>>>>> > >>>>>> Hi Jingsong, > >>>>>> > >>>>>> Big fan of this idea. We faced the same problem and resolved by > >> adding > >>> a > >>>>>> distributed lock. It would be nice to have this feature in > JobMaster, > >>>>> which > >>>>>> can replace the lock. > >>>>>> > >>>>>> Best, > >>>>>> Shimin > >>>>>> > >>>>>> JingsongLee <lzljs3620...@aliyun.com.invalid> 于2019年9月6日周五 > >> 下午12:20写道: > >>>>>> > >>>>>>> Hi devs: > >>>>>>> > >>>>>>> I try to implement streaming file sink for table[1] like > >>>>> StreamingFileSink. > >>>>>>> If the underlying is a HiveFormat, or a format that updates > >> visibility > >>>>>>> through a metaStore, I have to update the metaStore in the > >>>>>>> notifyCheckpointComplete, but this operation occurs on the task > >> side, > >>>>>>> which will lead to distributed access to the metaStore, which will > >>>>>>> lead to bottleneck. > >>>>>>> > >>>>>>> So I'm curious if we can support notifyOnMaster for > >>>>>>> notifyCheckpointComplete like FinalizeOnMaster. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>> > >>> > >> > https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing > >>>>>>> > >>>>>>> Best, > >>>>>>> Jingsong Lee > >>>>> > >>>>> > >>> > >>> > >> > >