Hi Yuxia, +1 for the stored procedure, which would make Flink SQL much more extendable.
I have some concerns regarding the FLIP: 1. IIUC, stored procedures are bound to a catalog (I found no registration or discovery interfaces), does that mean a connector has to provide a dedicated catalog to support stored procedures? 2. How to manage stored procedures' life cycles? Do we treat them as normal jobs and use job statements like `stop job xxx` to stop an unwanted procedure? 3. Stored procedures provided another approach to submitting Flink jobs apart from insert/select statements that are based on ModifyOperation, it would be great if you could elaborate a bit more about the difference between them. For example, are table configurations like `table.dml-sync` or `table.exec.resource.default-parallelism` still effective for stored procedures? 4. Flink may enforce single-job execution in application mode, in that case, I guess we must execute one stored procedure with its dedicated Flink cluster, am I right? Thanks! Best, Paul Lam > 2023年6月1日 17:04,yuxia <luoyu...@alumni.sjtu.edu.cn> 写道: > > Sorry for missing the importart part, repeat it agin: > "But we can't get the `StreamExecutionEnvironment` which is the entrypoint to > build datastream from `TableEnvironment`; > > Best regards, > Yuxia > > ----- 原始邮件 ----- > 发件人: "yuxia" <luoyu...@alumni.sjtu.edu.cn> > 收件人: "dev" <dev@flink.apache.org> > 发送时间: 星期四, 2023年 6 月 01日 下午 4:56:15 > 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure > > Hi, Benchao. > Thanks for your attention. > > Initially, I also want to pass `TableEnvironment` to procedure. But according > my investegation and offline discussion with Jingson, the real important > thing for procedure devs is the ability to build Flink datastream. But we > can't get the `StreamExecutionEnvironment` which is the entrypoint to build > datastream. That's to say we will lost the ability to build a datastream if > we just pass `TableEnvironment`. > > Of course, we can also pass `TableEnvironment` along with > `StreamExecutionEnvironment` to Procedure. But I'm intend to be cautious > about exposing too much too early to procedure devs. If someday we find we > will need `TableEnvironment` to custom a procedure, we can then add a method > like `getTableEnvironment()` in `ProcedureContext`. > > Best regards, > Yuxia > > ----- 原始邮件 ----- > 发件人: "Benchao Li" <libenc...@apache.org> > 收件人: "dev" <dev@flink.apache.org> > 发送时间: 星期四, 2023年 6 月 01日 下午 12:58:08 > 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure > > Thanks Yuxia for opening this discussion, > > The general idea looks good to me, I only have one question about the > `ProcedureContext#getExecutionEnvironment`. Why are you proposing to return > a `StreamExecutionEnvironment` instead of `TableEnvironment`, could you > elaborate a little more on this? > > Jingsong Li <jingsongl...@gmail.com> 于2023年5月30日周二 17:58写道: > >> Thanks for your explanation. >> >> We can support Iterable in future. Current design looks good to me. >> >> Best, >> Jingsong >> >> On Tue, May 30, 2023 at 4:56 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: >>> >>> Hi, Jingsong. >>> Thanks for your feedback. >>> >>>> Does this need to be a function call? Do you have some example? >>> I think it'll be useful to support function call when user call >> procedure. >>> The following example is from iceberg:[1] >>> CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', >> 'bar')); >>> >>> It allows user to use `map('foo', 'bar')` to pass a map data to >> procedure. >>> >>> Another case that I can imagine may be rollback a table to the snapshot >> of one week ago. >>> Then, with function call, user may call `rollback(table_name, now() - >> INTERVAL '7' DAY)` to acheive such purpose. >>> >>> Although it can be function call, the eventual parameter got by the >> procedure will always be the literal evaluated. >>> >>> >>>> Procedure looks like a TableFunction, do you consider using Collector >>> something like TableFunction? (Supports large amount of data) >>> >>> Yes, I had considered it. But returns T[] is for simpility, >>> >>> First, regarding how to return the calling result of a procedure, it >> looks more intuitive to me to use the return result of the `call` method >> instead of by calling something like collector#collect. >>> Introduce a collector will increase necessary complexity. >>> >>> Second, regarding supporting large amount of data, acoording my >> investagtion, I haven't seen the requirement that supports returning large >> amount of data. >>> Iceberg also return an array.[2] If you do think we should support large >> amount of data, I think we can change to return type from T[] to Iterable<T> >>> >>> [1]: https://iceberg.apache.org/docs/latest/spark-procedures/#migrate >>> [2]: >> https://github.com/apache/iceberg/blob/601c5af9b6abded79dabeba177331310d5487f43/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/Procedure.java#L44 >>> >>> Best regards, >>> Yuxia >>> >>> ----- 原始邮件 ----- >>> 发件人: "Jingsong Li" <jingsongl...@gmail.com> >>> 收件人: "dev" <dev@flink.apache.org> >>> 发送时间: 星期一, 2023年 5 月 29日 下午 2:42:04 >>> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure >>> >>> Thanks Yuxia for the proposal. >>> >>>> CALL [catalog_name.][database_name.]procedure_name ([ expression [, >> expression]* ] ) >>> >>> The expression can be a function call. Does this need to be a function >>> call? Do you have some example? >>> >>>> Procedure returns T[] >>> >>> Procedure looks like a TableFunction, do you consider using Collector >>> something like TableFunction? (Supports large amount of data) >>> >>> Best, >>> Jingsong >>> >>> On Mon, May 29, 2023 at 2:33 PM yuxia <luoyu...@alumni.sjtu.edu.cn> >> wrote: >>>> >>>> Hi, everyone. >>>> >>>> I’d like to start a discussion about FLIP-311: Support Call Stored >> Procedure [1] >>>> >>>> Stored procedure provides a convenient way to encapsulate complex >> logic to perform data manipulation or administrative tasks in external >> storage systems. It's widely used in traditional databases and popular >> compute engines like Trino for it's convenience. Therefore, we propose >> adding support for call stored procedure in Flink to enable better >> integration with external storage systems. >>>> >>>> With this FLIP, Flink will allow connector developers to develop their >> own built-in stored procedures, and then enables users to call these >> predefiend stored procedures. >>>> >>>> Looking forward to your feedbacks. >>>> >>>> [1]: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure >>>> >>>> Best regards, >>>> Yuxia >> > > > -- > > Best, > Benchao Li