Hi, Paul Lam

Thanks for your attention.

1: Yes, you need to have a catalog to provide procedure

2: Actually, we don't need to manage stored procedures' life cycles. The whole 
calling for stored procedure is a synchronize process. Its procedure 
implementation to decide to wait the job finish or submit the job aynchronously 
and return the job id or other informations, so that the users can stop the job 
with the job id.

3: There's no much difference between them since they're all Flink jobs 
eventually. The main difference is that the job is submited by the 
corresponding procedure with the StreamExecutionEnvironment provided. 
Regarding to the table configurations, since we only pass 
StreamExecutionEnvironment to the procedure, although these configurations can 
be seen by streamExecEnv.getConfiguration().get(xxx) in 
StreamExecutionEnvironment, they aren't be effective as other sql statements 
since these table configurations are mainly be considerd by the 
TableEnvironment.

4: Yes, you're right.



Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Paul Lam" <paullin3...@gmail.com>
收件人: "dev" <dev@flink.apache.org>
发送时间: 星期四, 2023年 6 月 01日 下午 6:34:24
主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

Hi Yuxia,

+1 for the stored procedure, which would make Flink SQL much more extendable.

I have some concerns regarding the FLIP:

1. IIUC, stored procedures are bound to a catalog (I found no registration or 
discovery
    interfaces), does that mean a connector has to provide a dedicated catalog 
to support 
    stored procedures?

2. How to manage stored procedures' life cycles? Do we treat them as normal 
jobs and
    use job statements like `stop job xxx` to stop an unwanted procedure?

3. Stored procedures provided another approach to submitting Flink jobs apart 
from 
    insert/select statements that are based on ModifyOperation, it would be 
great if you
    could elaborate a bit more about the difference between them. For example, 
are
    table configurations like `table.dml-sync` or 
`table.exec.resource.default-parallelism` 
    still effective for stored procedures?

4. Flink may enforce single-job execution in application mode, in that case, I 
guess we 
    must execute one stored procedure with its dedicated Flink cluster, am I 
right?

Thanks!

Best,
Paul Lam

> 2023年6月1日 17:04,yuxia <luoyu...@alumni.sjtu.edu.cn> 写道:
> 
> Sorry for missing the importart part, repeat it agin:
> "But we can't get the `StreamExecutionEnvironment` which is the entrypoint to 
> build datastream from `TableEnvironment`;
> 
> Best regards,
> Yuxia
> 
> ----- 原始邮件 -----
> 发件人: "yuxia" <luoyu...@alumni.sjtu.edu.cn>
> 收件人: "dev" <dev@flink.apache.org>
> 发送时间: 星期四, 2023年 6 月 01日 下午 4:56:15
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> 
> Hi, Benchao.
> Thanks for your attention.
> 
> Initially, I also want to pass `TableEnvironment` to procedure. But according 
> my investegation and offline discussion with Jingson, the real important 
> thing for procedure devs is the ability to build Flink datastream. But we 
> can't get the `StreamExecutionEnvironment` which is the entrypoint to build 
> datastream. That's to say we will lost the ability to build a datastream if 
> we just pass `TableEnvironment`.
> 
> Of course, we can also pass `TableEnvironment` along with 
> `StreamExecutionEnvironment` to Procedure. But I'm intend to be cautious 
> about exposing too much too early to procedure devs. If someday we find we 
> will need `TableEnvironment` to custom a procedure, we can then add a method 
> like `getTableEnvironment()` in `ProcedureContext`.
> 
> Best regards,
> Yuxia
> 
> ----- 原始邮件 -----
> 发件人: "Benchao Li" <libenc...@apache.org>
> 收件人: "dev" <dev@flink.apache.org>
> 发送时间: 星期四, 2023年 6 月 01日 下午 12:58:08
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> 
> Thanks Yuxia for opening this discussion,
> 
> The general idea looks good to me, I only have one question about the
> `ProcedureContext#getExecutionEnvironment`. Why are you proposing to return
> a `StreamExecutionEnvironment` instead of `TableEnvironment`, could you
> elaborate a little more on this?
> 
> Jingsong Li <jingsongl...@gmail.com> 于2023年5月30日周二 17:58写道:
> 
>> Thanks for your explanation.
>> 
>> We can support Iterable in future. Current design looks good to me.
>> 
>> Best,
>> Jingsong
>> 
>> On Tue, May 30, 2023 at 4:56 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>>> 
>>> Hi, Jingsong.
>>> Thanks for your feedback.
>>> 
>>>> Does this need to be a function call? Do you have some example?
>>> I think it'll be useful to support function call when user call
>> procedure.
>>> The following example is from iceberg:[1]
>>> CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo',
>> 'bar'));
>>> 
>>> It allows user to use `map('foo', 'bar')` to pass a map data to
>> procedure.
>>> 
>>> Another case that I can imagine may be rollback a table to the snapshot
>> of one week ago.
>>> Then, with function call, user may call `rollback(table_name, now() -
>> INTERVAL '7' DAY)` to acheive such purpose.
>>> 
>>> Although it can be function call, the eventual parameter got by the
>> procedure will always be the literal evaluated.
>>> 
>>> 
>>>> Procedure looks like a TableFunction, do you consider using Collector
>>> something like TableFunction? (Supports large amount of data)
>>> 
>>> Yes, I had considered it. But returns T[] is for simpility,
>>> 
>>> First, regarding how to return the calling result of a procedure, it
>> looks more intuitive to me to use the return result of the `call` method
>> instead of by calling something like collector#collect.
>>> Introduce a collector will increase necessary complexity.
>>> 
>>> Second, regarding supporting large amount of data,  acoording my
>> investagtion, I haven't seen the requirement that supports returning large
>> amount of data.
>>> Iceberg also return an array.[2] If you do think we should support large
>> amount of data, I think we can change to return type from T[] to Iterable<T>
>>> 
>>> [1]: https://iceberg.apache.org/docs/latest/spark-procedures/#migrate
>>> [2]:
>> https://github.com/apache/iceberg/blob/601c5af9b6abded79dabeba177331310d5487f43/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/Procedure.java#L44
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> ----- 原始邮件 -----
>>> 发件人: "Jingsong Li" <jingsongl...@gmail.com>
>>> 收件人: "dev" <dev@flink.apache.org>
>>> 发送时间: 星期一, 2023年 5 月 29日 下午 2:42:04
>>> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>>> 
>>> Thanks Yuxia for the proposal.
>>> 
>>>> CALL [catalog_name.][database_name.]procedure_name ([ expression [,
>> expression]* ] )
>>> 
>>> The expression can be a function call. Does this need to be a function
>>> call? Do you have some example?
>>> 
>>>> Procedure returns T[]
>>> 
>>> Procedure looks like a TableFunction, do you consider using Collector
>>> something like TableFunction? (Supports large amount of data)
>>> 
>>> Best,
>>> Jingsong
>>> 
>>> On Mon, May 29, 2023 at 2:33 PM yuxia <luoyu...@alumni.sjtu.edu.cn>
>> wrote:
>>>> 
>>>> Hi, everyone.
>>>> 
>>>> I’d like to start a discussion about FLIP-311: Support Call Stored
>> Procedure [1]
>>>> 
>>>> Stored procedure provides a convenient way to encapsulate complex
>> logic to perform data manipulation or administrative tasks in external
>> storage systems. It's widely used in traditional databases and popular
>> compute engines like Trino for it's convenience. Therefore, we propose
>> adding support for call stored procedure in Flink to enable better
>> integration with external storage systems.
>>>> 
>>>> With this FLIP, Flink will allow connector developers to develop their
>> own built-in stored procedures, and then enables users to call these
>> predefiend stored procedures.
>>>> 
>>>> Looking forward to your feedbacks.
>>>> 
>>>> [1]:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
>>>> 
>>>> Best regards,
>>>> Yuxia
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li

Reply via email to