Hi, Theo
Thank you for sharing your solution.
>From your description, it seems that what you need is a listener that could
notify the state change of the partition/bucket: created/updated/closed.
(maybe you don't need the close notify).
I am not familiar with Impala. So what I want to know is why you need to be
notified when the partition got new data every time. Would you like to give
some detailed descriptions?

Best,
Guowei


Theo Diefenthal <theo.diefent...@scoop-software.de> 于2020年5月13日周三 上午12:00写道:

> Hi Yun,
>
> For me, that sounds quite nice. I implemented the same for my application
> a few weeks ago, but of course tailored only to my app.
> What I did:
> 1. I wrapped the Parquet-StreamingFileSink into a Process-Function.
> 2. I extended the default ProcessOperator and instead of
> "notifyCheckpointComplete(long checkpointId)", I provided my
> WrappedProcessFunction a "notifyCheckpointComplete(checkointId,
> lastCommitWatermark)".
> 3. I added a custom sink with parallelism 1 behind the
> WrappedProcessFunction.
> 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a
> message downstream to the parallelism 1 sink containing data about which
> partitions were written to between in the phase to the last checkpoint.
> 5. In the parallelism 1 sink, I make sure that I get the messages from all
> upstream task (Give the constructor an int parameter telling it the
> parallelism of the WrappedProcessFunction) and then perform my parallelism
> 1 operation, in my case, telling Impala which partitions were added or got
> new data. Luckily, in case of Impala, that operation can be made idempotent
> so I only needed to make sure that I have an at least once processing from
> the state perspective here.
>
> I had to go for notifyCheckpointComplete as only there, the parquet files
> are ultimately committed and thus available for spark, impala and so on.
>
> So if you go on with that issue, I'd be really happy to be able to
> customize the solution and e.g. get rid of my custom setup by only
> specifiying kind of a lambda function which should be run with parallelism
> 1 and update impala. That function would however still need the info which
> partitions were updated/added.
> And in my case, I was not really interested in the watermark (I sent it
> downstream only for metric purposes) but want to tell impala after each
> commit which partitions changed, regardless of the value from the watermark.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Yun Gao" <yungao...@aliyun.com>
> *An: *"Robert Metzger" <rmetz...@apache.org>, "Jingsong Li" <
> jingsongl...@gmail.com>
> *CC: *"Peter Groesbeck" <peter.groesb...@gmail.com>, "user" <
> user@flink.apache.org>
> *Gesendet: *Dienstag, 12. Mai 2020 10:36:59
> *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>
> Hi Peter,
>
>     Sorry for missing the question and response later, I'm currently
> sworking together with Jingsong on the issue to support "global committing"
> (like writing _SUCCESS file or adding partitions to hive store) after
> buckets terminated. In 1.11 we may first support watermark/time related
> buckets in Table/SQL API, and we are also thinking of supporting "global
> committing" for arbitrary bucket assigner policy for StreamingFileSink
> users. The current rough thought is to let users specify when a bucket is
> terminated on a single task, and the OperatorCoordinator[1] of the sink
> will aggreate the information from all subtasks about this bucket and do
> the global committing if the bucket has been finished on all the subtasks,
> but this is still under thinking and discussion. Any thoughts or
> requirements on this issue are warmly welcome.
>
> Best,
>  Yun
>
>
> [1] OperatorCoordinator is introduced in FLIP-27:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
> This is a component resides in JobManager and could communicate with all
> the subtasks of the corresponding operator, thus it could be used to
> aggregate status from subtasks.
>
> ------------------原始邮件 ------------------
> *发件人:*Robert Metzger <rmetz...@apache.org>
> *发送时间:*Tue May 12 15:36:26 2020
> *收件人:*Jingsong Li <jingsongl...@gmail.com>
> *抄送:*Peter Groesbeck <peter.groesb...@gmail.com>, user <
> user@flink.apache.org>
> *主题:*Re: Writing _SUCCESS Files (Streaming and Batch)
>
>> Hi Peter,
>> I filed a ticket for this feature request:
>> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
>> thoughts / requirements to the ticket)
>>
>> Best,
>> Robert
>>
>>
>> On Wed, May 6, 2020 at 3:41 AM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Hi Peter,
>>> The troublesome is how to know the "ending" for a bucket in streaming
>>> job.
>>> In 1.11, we are trying to implement a watermark-related bucket ending
>>> mechanism[1] in Table/SQL.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <
>>> peter.groesb...@gmail.com> wrote:
>>>
>>>> I am replacing an M/R job with a Streaming job using the
>>>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS
>>>> file like the old Hadoop job. I have to implement a similar Batch job to
>>>> read from backup files in case of outages or downtime.
>>>>
>>>> The Batch job question was answered here and appears to be still
>>>> relevant although if someone could confirm for me that would be great.
>>>> https://stackoverflow.com/a/39413810
>>>>
>>>> The question of the Streaming job came up back in 2018 here:
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E
>>>>
>>>> But the solution to use or extend the BucketingSink class seems out of
>>>> date now that BucketingSink has been deprecated.
>>>>
>>>> Is there a way to implement a similar solution for StreamingFileSink?
>>>>
>>>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
>>>> future.
>>>>
>>>> Thank you,
>>>> Peter
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

Reply via email to