Hi Yuxia,

+1 for the stored procedure, which would make Flink SQL much more extendable.

I have some concerns regarding the FLIP:

1. IIUC, stored procedures are bound to a catalog (I found no registration or 
discovery
    interfaces), does that mean a connector has to provide a dedicated catalog 
to support 
    stored procedures?

2. How to manage stored procedures' life cycles? Do we treat them as normal 
jobs and
    use job statements like `stop job xxx` to stop an unwanted procedure?

3. Stored procedures provided another approach to submitting Flink jobs apart 
from 
    insert/select statements that are based on ModifyOperation, it would be 
great if you
    could elaborate a bit more about the difference between them. For example, 
are
    table configurations like `table.dml-sync` or 
`table.exec.resource.default-parallelism` 
    still effective for stored procedures?

4. Flink may enforce single-job execution in application mode, in that case, I 
guess we 
    must execute one stored procedure with its dedicated Flink cluster, am I 
right?

Thanks!

Best,
Paul Lam

> 2023年6月1日 17:04,yuxia <luoyu...@alumni.sjtu.edu.cn> 写道:
> 
> Sorry for missing the importart part, repeat it agin:
> "But we can't get the `StreamExecutionEnvironment` which is the entrypoint to 
> build datastream from `TableEnvironment`;
> 
> Best regards,
> Yuxia
> 
> ----- 原始邮件 -----
> 发件人: "yuxia" <luoyu...@alumni.sjtu.edu.cn>
> 收件人: "dev" <dev@flink.apache.org>
> 发送时间: 星期四, 2023年 6 月 01日 下午 4:56:15
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> 
> Hi, Benchao.
> Thanks for your attention.
> 
> Initially, I also want to pass `TableEnvironment` to procedure. But according 
> my investegation and offline discussion with Jingson, the real important 
> thing for procedure devs is the ability to build Flink datastream. But we 
> can't get the `StreamExecutionEnvironment` which is the entrypoint to build 
> datastream. That's to say we will lost the ability to build a datastream if 
> we just pass `TableEnvironment`.
> 
> Of course, we can also pass `TableEnvironment` along with 
> `StreamExecutionEnvironment` to Procedure. But I'm intend to be cautious 
> about exposing too much too early to procedure devs. If someday we find we 
> will need `TableEnvironment` to custom a procedure, we can then add a method 
> like `getTableEnvironment()` in `ProcedureContext`.
> 
> Best regards,
> Yuxia
> 
> ----- 原始邮件 -----
> 发件人: "Benchao Li" <libenc...@apache.org>
> 收件人: "dev" <dev@flink.apache.org>
> 发送时间: 星期四, 2023年 6 月 01日 下午 12:58:08
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> 
> Thanks Yuxia for opening this discussion,
> 
> The general idea looks good to me, I only have one question about the
> `ProcedureContext#getExecutionEnvironment`. Why are you proposing to return
> a `StreamExecutionEnvironment` instead of `TableEnvironment`, could you
> elaborate a little more on this?
> 
> Jingsong Li <jingsongl...@gmail.com> 于2023年5月30日周二 17:58写道:
> 
>> Thanks for your explanation.
>> 
>> We can support Iterable in future. Current design looks good to me.
>> 
>> Best,
>> Jingsong
>> 
>> On Tue, May 30, 2023 at 4:56 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>>> 
>>> Hi, Jingsong.
>>> Thanks for your feedback.
>>> 
>>>> Does this need to be a function call? Do you have some example?
>>> I think it'll be useful to support function call when user call
>> procedure.
>>> The following example is from iceberg:[1]
>>> CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo',
>> 'bar'));
>>> 
>>> It allows user to use `map('foo', 'bar')` to pass a map data to
>> procedure.
>>> 
>>> Another case that I can imagine may be rollback a table to the snapshot
>> of one week ago.
>>> Then, with function call, user may call `rollback(table_name, now() -
>> INTERVAL '7' DAY)` to acheive such purpose.
>>> 
>>> Although it can be function call, the eventual parameter got by the
>> procedure will always be the literal evaluated.
>>> 
>>> 
>>>> Procedure looks like a TableFunction, do you consider using Collector
>>> something like TableFunction? (Supports large amount of data)
>>> 
>>> Yes, I had considered it. But returns T[] is for simpility,
>>> 
>>> First, regarding how to return the calling result of a procedure, it
>> looks more intuitive to me to use the return result of the `call` method
>> instead of by calling something like collector#collect.
>>> Introduce a collector will increase necessary complexity.
>>> 
>>> Second, regarding supporting large amount of data,  acoording my
>> investagtion, I haven't seen the requirement that supports returning large
>> amount of data.
>>> Iceberg also return an array.[2] If you do think we should support large
>> amount of data, I think we can change to return type from T[] to Iterable<T>
>>> 
>>> [1]: https://iceberg.apache.org/docs/latest/spark-procedures/#migrate
>>> [2]:
>> https://github.com/apache/iceberg/blob/601c5af9b6abded79dabeba177331310d5487f43/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/Procedure.java#L44
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> ----- 原始邮件 -----
>>> 发件人: "Jingsong Li" <jingsongl...@gmail.com>
>>> 收件人: "dev" <dev@flink.apache.org>
>>> 发送时间: 星期一, 2023年 5 月 29日 下午 2:42:04
>>> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>>> 
>>> Thanks Yuxia for the proposal.
>>> 
>>>> CALL [catalog_name.][database_name.]procedure_name ([ expression [,
>> expression]* ] )
>>> 
>>> The expression can be a function call. Does this need to be a function
>>> call? Do you have some example?
>>> 
>>>> Procedure returns T[]
>>> 
>>> Procedure looks like a TableFunction, do you consider using Collector
>>> something like TableFunction? (Supports large amount of data)
>>> 
>>> Best,
>>> Jingsong
>>> 
>>> On Mon, May 29, 2023 at 2:33 PM yuxia <luoyu...@alumni.sjtu.edu.cn>
>> wrote:
>>>> 
>>>> Hi, everyone.
>>>> 
>>>> I’d like to start a discussion about FLIP-311: Support Call Stored
>> Procedure [1]
>>>> 
>>>> Stored procedure provides a convenient way to encapsulate complex
>> logic to perform data manipulation or administrative tasks in external
>> storage systems. It's widely used in traditional databases and popular
>> compute engines like Trino for it's convenience. Therefore, we propose
>> adding support for call stored procedure in Flink to enable better
>> integration with external storage systems.
>>>> 
>>>> With this FLIP, Flink will allow connector developers to develop their
>> own built-in stored procedures, and then enables users to call these
>> predefiend stored procedures.
>>>> 
>>>> Looking forward to your feedbacks.
>>>> 
>>>> [1]:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
>>>> 
>>>> Best regards,
>>>> Yuxia
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li

Reply via email to