Hi, Theo Thank you for sharing your solution. >From your description, it seems that what you need is a listener that could notify the state change of the partition/bucket: created/updated/closed. (maybe you don't need the close notify). I am not familiar with Impala. So what I want to know is why you need to be notified when the partition got new data every time. Would you like to give some detailed descriptions?
Best, Guowei Theo Diefenthal <theo.diefent...@scoop-software.de> 于2020年5月13日周三 上午12:00写道: > Hi Yun, > > For me, that sounds quite nice. I implemented the same for my application > a few weeks ago, but of course tailored only to my app. > What I did: > 1. I wrapped the Parquet-StreamingFileSink into a Process-Function. > 2. I extended the default ProcessOperator and instead of > "notifyCheckpointComplete(long checkpointId)", I provided my > WrappedProcessFunction a "notifyCheckpointComplete(checkointId, > lastCommitWatermark)". > 3. I added a custom sink with parallelism 1 behind the > WrappedProcessFunction. > 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a > message downstream to the parallelism 1 sink containing data about which > partitions were written to between in the phase to the last checkpoint. > 5. In the parallelism 1 sink, I make sure that I get the messages from all > upstream task (Give the constructor an int parameter telling it the > parallelism of the WrappedProcessFunction) and then perform my parallelism > 1 operation, in my case, telling Impala which partitions were added or got > new data. Luckily, in case of Impala, that operation can be made idempotent > so I only needed to make sure that I have an at least once processing from > the state perspective here. > > I had to go for notifyCheckpointComplete as only there, the parquet files > are ultimately committed and thus available for spark, impala and so on. > > So if you go on with that issue, I'd be really happy to be able to > customize the solution and e.g. get rid of my custom setup by only > specifiying kind of a lambda function which should be run with parallelism > 1 and update impala. That function would however still need the info which > partitions were updated/added. > And in my case, I was not really interested in the watermark (I sent it > downstream only for metric purposes) but want to tell impala after each > commit which partitions changed, regardless of the value from the watermark. > > Best regards > Theo > > ------------------------------ > *Von: *"Yun Gao" <yungao...@aliyun.com> > *An: *"Robert Metzger" <rmetz...@apache.org>, "Jingsong Li" < > jingsongl...@gmail.com> > *CC: *"Peter Groesbeck" <peter.groesb...@gmail.com>, "user" < > user@flink.apache.org> > *Gesendet: *Dienstag, 12. Mai 2020 10:36:59 > *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch) > > Hi Peter, > > Sorry for missing the question and response later, I'm currently > sworking together with Jingsong on the issue to support "global committing" > (like writing _SUCCESS file or adding partitions to hive store) after > buckets terminated. In 1.11 we may first support watermark/time related > buckets in Table/SQL API, and we are also thinking of supporting "global > committing" for arbitrary bucket assigner policy for StreamingFileSink > users. The current rough thought is to let users specify when a bucket is > terminated on a single task, and the OperatorCoordinator[1] of the sink > will aggreate the information from all subtasks about this bucket and do > the global committing if the bucket has been finished on all the subtasks, > but this is still under thinking and discussion. Any thoughts or > requirements on this issue are warmly welcome. > > Best, > Yun > > > [1] OperatorCoordinator is introduced in FLIP-27: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface. > This is a component resides in JobManager and could communicate with all > the subtasks of the corresponding operator, thus it could be used to > aggregate status from subtasks. > > ------------------原始邮件 ------------------ > *发件人:*Robert Metzger <rmetz...@apache.org> > *发送时间:*Tue May 12 15:36:26 2020 > *收件人:*Jingsong Li <jingsongl...@gmail.com> > *抄送:*Peter Groesbeck <peter.groesb...@gmail.com>, user < > user@flink.apache.org> > *主题:*Re: Writing _SUCCESS Files (Streaming and Batch) > >> Hi Peter, >> I filed a ticket for this feature request: >> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your >> thoughts / requirements to the ticket) >> >> Best, >> Robert >> >> >> On Wed, May 6, 2020 at 3:41 AM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi Peter, >>> The troublesome is how to know the "ending" for a bucket in streaming >>> job. >>> In 1.11, we are trying to implement a watermark-related bucket ending >>> mechanism[1] in Table/SQL. >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table >>> >>> Best, >>> Jingsong Lee >>> >>> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck < >>> peter.groesb...@gmail.com> wrote: >>> >>>> I am replacing an M/R job with a Streaming job using the >>>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS >>>> file like the old Hadoop job. I have to implement a similar Batch job to >>>> read from backup files in case of outages or downtime. >>>> >>>> The Batch job question was answered here and appears to be still >>>> relevant although if someone could confirm for me that would be great. >>>> https://stackoverflow.com/a/39413810 >>>> >>>> The question of the Streaming job came up back in 2018 here: >>>> >>>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E >>>> >>>> But the solution to use or extend the BucketingSink class seems out of >>>> date now that BucketingSink has been deprecated. >>>> >>>> Is there a way to implement a similar solution for StreamingFileSink? >>>> >>>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near >>>> future. >>>> >>>> Thank you, >>>> Peter >>>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >>