Sorry, I mean we can bump the Calcite version if needed in Flink 1.20. On Fri, 22 Mar 2024 at 22:19, Jark Wu <imj...@gmail.com> wrote:
> Hi Timo, > > Introducing user-defined PTF is very useful in Flink, I'm +1 for this. > But I think the ML model FLIP is not blocked by this, because we > can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs > just like TUMBLE/HOP. And support user-defined ML functions as > a future FLIP. > > Regarding the simplified PTF syntax which reduces the outer TABLE() > keyword, > it seems it was just supported[1] by the Calcite community last month and > will be > released in the next version (v1.37). The Calcite community is preparing > the > 1.37 release, so we can bump the version if needed in Flink 1.19. > > Best, > Jark > > [1]: https://issues.apache.org/jira/browse/CALCITE-6254 > > On Fri, 22 Mar 2024 at 21:46, Timo Walther <twal...@apache.org> wrote: > >> Hi everyone, >> >> this is a very important change to the Flink SQL syntax but we can't >> wait until the SQL standard is ready for this. So I'm +1 on introducing >> the MODEL concept as a first class citizen in Flink. >> >> For your information: Over the past months I have already spent a >> significant amount of time thinking about how we can introduce PTFs in >> Flink. I reserved FLIP-440[1] for this purpose and I will share a >> version of this in the next 1-2 weeks. >> >> For a good implementation of FLIP-440 and also FLIP-437, we should >> evolve the PTF syntax in collaboration with Apache Calcite. >> >> There are different syntax versions out there: >> >> 1) Flink >> >> SELECT * FROM >> TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); >> >> 2) SQL standard >> >> SELECT * FROM >> TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); >> >> 3) Oracle >> >> SELECT * FROM >> TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES)); >> >> As you can see above, Flink does not follow the standard correctly as it >> would need to use `TABLE()` but this is not provided by Calcite yet. >> >> I really like the Oracle syntax[2][3] a lot. It reduces necessary >> keywords to a minimum. Personally, I would like to discuss this syntax >> in a separate FLIP and hope I will find supporters for: >> >> >> SELECT * FROM >> TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES); >> >> If we go entirely with the Oracle syntax, as you can see in the example, >> Oracle allows for passing identifiers directly. This would solve our >> problems for the MODEL as well: >> >> SELECT f1, f2, label FROM ML_PREDICT( >> data => `my_data`, >> model => `classifier_model`, >> input => DESCRIPTOR(f1, f2)); >> >> Or we completely adopt the Oracle syntax: >> >> SELECT f1, f2, label FROM ML_PREDICT( >> data => `my_data`, >> model => `classifier_model`, >> input => COLUMNS(f1, f2)); >> >> >> What do you think? >> >> Happy to create a FLIP for just this syntax question and collaborate >> with the Calcite community on this. Supporting the syntax of Oracle >> shouldn't be too hard to convince at least as parser parameter. >> >> Regards, >> Timo >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions >> [2] >> >> https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072 >> [3] https://oracle-base.com/articles/18c/polymorphic-table-functions-18c >> >> >> >> On 20.03.24 17:22, Mingge Deng wrote: >> > Thanks Jark for all the insightful comments. >> > >> > We have updated the proposal per our offline discussions: >> > 1. Model will be treated as a new relation in FlinkSQL. >> > 2. Include the common ML predict and evaluate functions into the open >> > source flink to complete the user journey. >> > And we should be able to extend the calcite SqlTableFunction to >> support >> > these two ML functions. >> > >> > Best, >> > Mingge >> > >> > On Mon, Mar 18, 2024 at 7:05 PM Jark Wu <imj...@gmail.com> wrote: >> > >> >> Hi Hao, >> >> >> >>> I meant how the table name >> >> in window TVF gets translated to `SqlCallingBinding`. Probably we need >> to >> >> fetch the table definition from the catalog somewhere. Do we treat >> those >> >> window TVF specially in parser/planner so that catalog is looked up >> when >> >> they are seen? >> >> >> >> The table names are resolved and validated by Calcite SqlValidator. We >> >> don' need to fetch from catalog manually. >> >> The specific checking logic of cumulate window happens in >> >> SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes. >> >> The return type of SqlCumulateTableFunction is defined in >> >> #getRowTypeInference() method. >> >> Both are public interfaces provided by Calcite and it seems it's not >> >> specially handled in parser/planner. >> >> >> >> I didn't try that, but my gut feeling is that the framework is ready to >> >> extend a customized TVF. >> >> >> >>> For what model is, I'm wondering if it has to be datatype or relation. >> >> Can >> >> it be another kind of citizen parallel to >> datatype/relation/function/db? >> >> Redshift also supports `show models` operation, so it seems it's >> treated >> >> specially as well? >> >> >> >> If it is an entity only used in catalog scope (e.g., show xxx, create >> xxx, >> >> drop xxx), it is fine to introduce it. >> >> We have introduced such one before, called Module: "load module", "show >> >> modules" [1]. >> >> But if we want to use Model in TVF parameters, it means it has to be a >> >> relation or datatype, because >> >> that is what it only accepts now. >> >> >> >> Thanks for sharing the reason of preferring TVF instead of Redshift >> way. It >> >> sounds reasonable to me. >> >> >> >> Best, >> >> Jark >> >> >> >> [1]: >> >> >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/ >> >> >> >> On Fri, 15 Mar 2024 at 13:41, Hao Li <h...@confluent.io.invalid> wrote: >> >> >> >>> Hi Jark, >> >>> >> >>> Thanks for the pointer. Sorry for the confusion: I meant how the table >> >> name >> >>> in window TVF gets translated to `SqlCallingBinding`. Probably we >> need to >> >>> fetch the table definition from the catalog somewhere. Do we treat >> those >> >>> window TVF specially in parser/planner so that catalog is looked up >> when >> >>> they are seen? >> >>> >> >>> For what model is, I'm wondering if it has to be datatype or relation. >> >> Can >> >>> it be another kind of citizen parallel to >> datatype/relation/function/db? >> >>> Redshift also supports `show models` operation, so it seems it's >> treated >> >>> specially as well? The reasons I don't like Redshift's syntax are: >> >>> 1. It's a bit verbose, you need to think of a model name as well as a >> >>> function name and the function name also needs to be unique. >> >>> 2. More importantly, prediction function isn't the only function that >> can >> >>> operate on models. There could be a set of inference functions [1] and >> >>> evaluation functions [2] which can operate on models. It's hard to >> >> specify >> >>> all of them in model creation. >> >>> >> >>> [1]: >> >>> >> >>> >> >> >> https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-predict >> >>> [2]: >> >>> >> >>> >> >> >> https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-evaluate >> >>> >> >>> Thanks, >> >>> Hao >> >>> >> >>> On Thu, Mar 14, 2024 at 8:18 PM Jark Wu <imj...@gmail.com> wrote: >> >>> >> >>>> Hi Hao, >> >>>> >> >>>>> Can you send me some pointers >> >>>> where the function gets the table information? >> >>>> >> >>>> Here is the code of cumulate window type checking [1]. >> >>>> >> >>>>> Also is it possible to support <query_stmt> in >> >>>> window functions in addiction to table? >> >>>> >> >>>> Yes. It is not allowed in TVF. >> >>>> >> >>>> Thanks for the syntax links of other systems. The reason I prefer the >> >>>> Redshift way is >> >>>> that it avoids introducing Model as a relation or datatype >> (referenced >> >>> as a >> >>>> parameter in TVF). >> >>>> Model is not a relation because it can be queried directly (e.g., >> >> SELECT >> >>> * >> >>>> FROM model). >> >>>> I'm also confused about making Model as a datatype, because I don't >> >> know >> >>>> what class the >> >>>> model parameter of the eval method of TableFunction/ScalarFunction >> >> should >> >>>> be. By defining >> >>>> the function with the model, users can directly invoke the function >> >>> without >> >>>> reference to the model name. >> >>>> >> >>>> Best, >> >>>> Jark >> >>>> >> >>>> [1]: >> >>>> >> >>>> >> >>> >> >> >> https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java#L53 >> >>>> >> >>>> On Fri, 15 Mar 2024 at 02:48, Hao Li <h...@confluent.io.invalid> >> wrote: >> >>>> >> >>>>> Hi Jark, >> >>>>> >> >>>>> Thanks for the pointers. It's very helpful. >> >>>>> >> >>>>> 1. Looks like `tumble`, `hopping` are keywords in calcite parser. >> And >> >>> the >> >>>>> syntax `cumulate(Table my_table, ...)` needs to get table >> information >> >>>> from >> >>>>> catalog somewhere for type validation etc. Can you send me some >> >>> pointers >> >>>>> where the function gets the table information? >> >>>>> 2. The ideal syntax for model function I think would be >> >>> `ML_PREDICT(MODEL >> >>>>> <model_name>, {table <table_name> | (query_stmt) })`. I think with >> >>>> special >> >>>>> handling of the `ML_PREDICT` function in parser/planner, maybe we >> can >> >>> do >> >>>>> this like window functions. But to support `MODEL` keyword, we need >> >>>> calcite >> >>>>> parser change I guess. Also is it possible to support <query_stmt> >> in >> >>>>> window functions in addiction to table? >> >>>>> >> >>>>> For the redshift syntax, I'm not sure the purpose of defining the >> >>>> function >> >>>>> name with the model. Is it to define the function input/output >> >> schema? >> >>> We >> >>>>> have the schema in our create model syntax and the `ML_PREDICT` can >> >>>> handle >> >>>>> it by getting model definition. I think our syntax is more concise >> to >> >>>> have >> >>>>> a generic prediction function. I also did some research and it's the >> >>>> syntax >> >>>>> used by Databricks `ai_query` [1], Snowflake `predict` [2], Azureml >> >>>>> `predict` [3]. >> >>>>> >> >>>>> [1]: >> >>>>> >> >>>> >> >>> >> >> >> https://docs.databricks.com/en/sql/language-manual/functions/ai_query.html >> >>>>> [2]: >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_inference.ipynb?_fsi=sksXUwQ0 >> >>>>> [3]: >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://learn.microsoft.com/en-us/sql/machine-learning/tutorials/quickstart-python-train-score-model?view=azuresqldb-mi-current >> >>>>> >> >>>>> Thanks, >> >>>>> Hao >> >>>>> >> >>>>> On Wed, Mar 13, 2024 at 8:57 PM Jark Wu <imj...@gmail.com> wrote: >> >>>>> >> >>>>>> Hi Mingge, Hao, >> >>>>>> >> >>>>>> Thanks for your replies. >> >>>>>> >> >>>>>>> PTF is actually the ideal approach for model functions, and we do >> >>>> have >> >>>>>> the plans to use PTF for >> >>>>>> all model functions (including prediction, evaluation etc..) once >> >> the >> >>>> PTF >> >>>>>> is supported in FlinkSQL >> >>>>>> confluent extension. >> >>>>>> >> >>>>>> It sounds that PTF is the ideal way and table function is a >> >> temporary >> >>>>>> solution which will be dropped in the future. >> >>>>>> I'm not sure whether we can implement it using PTF in Flink SQL. >> >> But >> >>> we >> >>>>>> have implemented window >> >>>>>> functions using PTF[1]. And introduced a new window function >> >> (called >> >>>>>> CUMULATE[2]) in Flink SQL based >> >>>>>> on this. I think it might work to use PTF and implement model >> >>> function >> >>>>>> syntax like this: >> >>>>>> >> >>>>>> SELECT * FROM TABLE(ML_PREDICT( >> >>>>>> TABLE my_table, >> >>>>>> my_model, >> >>>>>> col1, >> >>>>>> col2 >> >>>>>> )); >> >>>>>> >> >>>>>> Besides, did you consider following the way of AWS Redshift which >> >>>> defines >> >>>>>> model function with the model itself together? >> >>>>>> IIUC, a model is a black-box which defines input parameters and >> >>> output >> >>>>>> parameters which can be modeled into functions. >> >>>>>> >> >>>>>> >> >>>>>> Best, >> >>>>>> Jark >> >>>>>> >> >>>>>> [1]: >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#session >> >>>>>> [2]: >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows >> >>>>>> [3]: >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://github.com/aws-samples/amazon-redshift-ml-getting-started/blob/main/use-cases/bring-your-own-model-remote-inference/README.md#create-model >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On Wed, 13 Mar 2024 at 15:00, Hao Li <h...@confluent.io.invalid> >> >>> wrote: >> >>>>>> >> >>>>>>> Hi Jark, >> >>>>>>> >> >>>>>>> Thanks for your questions. These are good questions! >> >>>>>>> >> >>>>>>> 1. The polymorphism table function I was referring to takes a >> >> table >> >>>> as >> >>>>>>> input and outputs a table. So the syntax would be like >> >>>>>>> ``` >> >>>>>>> SELECT * FROM ML_PREDICT('model', (SELECT * FROM my_table)) >> >>>>>>> ``` >> >>>>>>> As far as I know, this is not supported yet on Flink. So before >> >>> it's >> >>>>>>> supported, one option for the predict function is using table >> >>>> function >> >>>>>>> which can output multiple columns >> >>>>>>> ``` >> >>>>>>> SELECT * FROM my_table, LATERAL VIEW (ML_PREDICT('model', col1, >> >>>> col2)) >> >>>>>>> ``` >> >>>>>>> >> >>>>>>> 2. Good question. Type inference is hard for the `ML_PREDICT` >> >>>> function >> >>>>>>> because it takes a model name string as input. I can think of >> >> three >> >>>>> ways >> >>>>>> of >> >>>>>>> doing type inference for it. >> >>>>>>> 1). Treat `ML_PREDICT` function as something special and >> >> during >> >>>> sql >> >>>>>>> parsing or planning time, if it's encountered, we need to look up >> >>> the >> >>>>>> model >> >>>>>>> from the first argument which is a model name from catalog. Then >> >> we >> >>>> can >> >>>>>>> infer the input/output for the function. >> >>>>>>> 2). We can define a `model` keyword and use that in the >> >> predict >> >>>>>> function >> >>>>>>> to indicate the argument refers to a model. So it's like >> >>>>>> `ML_PREDICT(model >> >>>>>>> 'my_model', col1, col2))` >> >>>>>>> 3). We can create a special type of table function maybe >> >> called >> >>>>>>> `ModelFunction` which can resolve the model type inference by >> >>> special >> >>>>>>> handling it during parsing or planning time. >> >>>>>>> 1) is hacky, 2) isn't supported in Flink for function, 3) might >> >> be >> >>> a >> >>>>>>> good option. >> >>>>>>> >> >>>>>>> 3. I sketched the `ML_PREDICT` function for inference. But there >> >>> are >> >>>>>>> limitations of the function mentioned in 1 and 2. So maybe we >> >> don't >> >>>>> need >> >>>>>> to >> >>>>>>> introduce them as built-in functions until polymorphism table >> >>>> function >> >>>>>> and >> >>>>>>> we can properly deal with type inference. >> >>>>>>> After that, defining a user-defined model function should also be >> >>>>>>> straightforward. >> >>>>>>> >> >>>>>>> 4. For model types, do you mean 'remote', 'import', 'native' >> >> models >> >>>> or >> >>>>>>> other things? >> >>>>>>> >> >>>>>>> 5. We could support popular providers such as 'azureml', >> >>> 'vertexai', >> >>>>>>> 'googleai' as long as we support the `ML_PREDICT` function. Users >> >>>>> should >> >>>>>> be >> >>>>>>> able to implement 3rd-party providers if they can implement a >> >>>> function >> >>>>>>> handling the input/output for the provider. >> >>>>>>> >> >>>>>>> I think for the model functions, there are still dependencies or >> >>>> hacks >> >>>>> we >> >>>>>>> need to sort out as a built-in function. Maybe we can separate >> >> that >> >>>> as >> >>>>> a >> >>>>>>> follow up if we want to have it built-in and focus on the model >> >>>> syntax >> >>>>>> for >> >>>>>>> this FLIP? >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Hao >> >>>>>>> >> >>>>>>> On Tue, Mar 12, 2024 at 10:33 PM Jark Wu <imj...@gmail.com> >> >> wrote: >> >>>>>>> >> >>>>>>>> Hi Minge, Chris, Hao, >> >>>>>>>> >> >>>>>>>> Thanks for proposing this interesting idea. I think this is a >> >>> nice >> >>>>> step >> >>>>>>>> towards >> >>>>>>>> the AI world for Apache Flink. I don't know much about AI/ML, >> >> so >> >>> I >> >>>>> may >> >>>>>>> have >> >>>>>>>> some stupid questions. >> >>>>>>>> >> >>>>>>>> 1. Could you tell more about why polymorphism table function >> >>> (PTF) >> >>>>>>> doesn't >> >>>>>>>> work and do we have plan to use PTF as model functions? >> >>>>>>>> >> >>>>>>>> 2. What kind of object does the model map to in SQL? A relation >> >>> or >> >>>> a >> >>>>>> data >> >>>>>>>> type? >> >>>>>>>> It looks like a data type because we use it as a parameter of >> >> the >> >>>>> table >> >>>>>>>> function. >> >>>>>>>> If it is a data type, how does it cooperate with type >> >>> inference[1]? >> >>>>>>>> >> >>>>>>>> 3. What built-in model functions will we support? How to >> >> define a >> >>>>>>>> user-defined model function? >> >>>>>>>> >> >>>>>>>> 4. What built-in model types will we support? How to define a >> >>>>>>> user-defined >> >>>>>>>> model type? >> >>>>>>>> >> >>>>>>>> 5. Regarding the remote model, what providers will we support? >> >>> Can >> >>>>>> users >> >>>>>>>> implement >> >>>>>>>> 3rd-party providers except OpenAI? >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Jark >> >>>>>>>> >> >>>>>>>> [1]: >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Wed, 13 Mar 2024 at 05:55, Hao Li <h...@confluent.io.invalid >> >>> >> >>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi, Dev >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Mingge, Chris and I would like to start a discussion about >> >>>>> FLIP-437: >> >>>>>>>>> Support ML Models in Flink SQL. >> >>>>>>>>> >> >>>>>>>>> This FLIP is proposing to support machine learning models in >> >>>> Flink >> >>>>>> SQL >> >>>>>>>>> syntax so that users can CRUD models with Flink SQL and use >> >>>> models >> >>>>> on >> >>>>>>>> Flink >> >>>>>>>>> to do prediction with Flink data. The FLIP also proposes new >> >>>> model >> >>>>>>>> entities >> >>>>>>>>> and changes to catalog interface to support model CRUD >> >>> operations >> >>>>> in >> >>>>>>>>> catalog. >> >>>>>>>>> >> >>>>>>>>> For more details, see FLIP-437 [1]. Looking forward to your >> >>>>> feedback. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> [1] >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> Minge, Chris & Hao >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >>