Hi Yun, 

For me, that sounds quite nice. I implemented the same for my application a few 
weeks ago, but of course tailored only to my app. 
What I did: 
1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 
2. I extended the default ProcessOperator and instead of 
"notifyCheckpointComplete(long checkpointId)", I provided my 
WrappedProcessFunction a "notifyCheckpointComplete(checkointId, 
lastCommitWatermark)". 
3. I added a custom sink with parallelism 1 behind the WrappedProcessFunction. 
4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a 
message downstream to the parallelism 1 sink containing data about which 
partitions were written to between in the phase to the last checkpoint. 
5. In the parallelism 1 sink, I make sure that I get the messages from all 
upstream task (Give the constructor an int parameter telling it the parallelism 
of the WrappedProcessFunction) and then perform my parallelism 1 operation, in 
my case, telling Impala which partitions were added or got new data. Luckily, 
in case of Impala, that operation can be made idempotent so I only needed to 
make sure that I have an at least once processing from the state perspective 
here. 

I had to go for notifyCheckpointComplete as only there, the parquet files are 
ultimately committed and thus available for spark, impala and so on. 

So if you go on with that issue, I'd be really happy to be able to customize 
the solution and e.g. get rid of my custom setup by only specifiying kind of a 
lambda function which should be run with parallelism 1 and update impala. That 
function would however still need the info which partitions were updated/added. 
And in my case, I was not really interested in the watermark (I sent it 
downstream only for metric purposes) but want to tell impala after each commit 
which partitions changed, regardless of the value from the watermark. 

Best regards 
Theo 


Von: "Yun Gao" <yungao...@aliyun.com> 
An: "Robert Metzger" <rmetz...@apache.org>, "Jingsong Li" 
<jingsongl...@gmail.com> 
CC: "Peter Groesbeck" <peter.groesb...@gmail.com>, "user" 
<user@flink.apache.org> 
Gesendet: Dienstag, 12. Mai 2020 10:36:59 
Betreff: 回复:Re: Writing _SUCCESS Files (Streaming and Batch) 

Hi Peter, 

Sorry for missing the question and response later, I'm currently sworking 
together with Jingsong on the issue to support "global committing" (like 
writing _SUCCESS file or adding partitions to hive store) after buckets 
terminated. In 1.11 we may first support watermark/time related buckets in 
Table/SQL API, and we are also thinking of supporting "global committing" for 
arbitrary bucket assigner policy for StreamingFileSink users. The current rough 
thought is to let users specify when a bucket is terminated on a single task, 
and the OperatorCoordinator[1] of the sink will aggreate the information from 
all subtasks about this bucket and do the global committing if the bucket has 
been finished on all the subtasks, but this is still under thinking and 
discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best, 
Yun 


[1] OperatorCoordinator is introduced in FLIP-27: [ 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 | 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 ] . This is a component resides in JobManager and could communicate with all 
the subtasks of the corresponding operator, thus it could be used to aggregate 
status from subtasks. 




------------------原始邮件 ------------------ 
发件人: Robert Metzger <rmetz...@apache.org> 
发送时间: Tue May 12 15:36:26 2020 
收件人: Jingsong Li <jingsongl...@gmail.com> 
抄送: Peter Groesbeck <peter.groesb...@gmail.com>, user <user@flink.apache.org> 
主题: Re: Writing _SUCCESS Files (Streaming and Batch) 

BQ_BEGIN

Hi Peter, 
I filed a ticket for this feature request: [ 
https://issues.apache.org/jira/browse/FLINK-17627 | 
https://issues.apache.org/jira/browse/FLINK-17627 ] (feel free to add your 
thoughts / requirements to the ticket) 

Best, 
Robert 


On Wed, May 6, 2020 at 3:41 AM Jingsong Li < [ mailto:jingsongl...@gmail.com | 
jingsongl...@gmail.com ] > wrote: 

BQ_BEGIN

Hi Peter, 
The troublesome is how to know the "ending" for a bucket in streaming job. 
In 1.11, we are trying to implement a watermark-related bucket ending 
mechanism[1] in Table/SQL. 

[1] [ 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
 | 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
 ] 

Best, 
Jingsong Lee 

On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck < [ 
mailto:peter.groesb...@gmail.com | peter.groesb...@gmail.com ] > wrote: 

BQ_BEGIN

I am replacing an M/R job with a Streaming job using the StreamingFileSink and 
there is a requirement to generate an empty _SUCCESS file like the old Hadoop 
job. I have to implement a similar Batch job to read from backup files in case 
of outages or downtime. 

The Batch job question was answered here and appears to be still relevant 
although if someone could confirm for me that would be great. 
[ https://stackoverflow.com/a/39413810 | https://stackoverflow.com/a/39413810 ] 

The question of the Streaming job came up back in 2018 here: 
[ 
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E
 | 
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E
 ] 

But the solution to use or extend the BucketingSink class seems out of date now 
that BucketingSink has been deprecated. 

Is there a way to implement a similar solution for StreamingFileSink? 

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future. 

Thank you, 
Peter 





-- 
Best, Jingsong Lee 

BQ_END


BQ_END


BQ_END

Reply via email to