Sorry, I mean we can bump the Calcite version if needed in Flink 1.20.

On Fri, 22 Mar 2024 at 22:19, Jark Wu <imj...@gmail.com> wrote:

> Hi Timo,
>
> Introducing user-defined PTF is very useful in Flink, I'm +1 for this.
> But I think the ML model FLIP is not blocked by this, because we
> can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs
> just like TUMBLE/HOP. And support user-defined ML functions as
> a future FLIP.
>
> Regarding the simplified PTF syntax which reduces the outer TABLE()
> keyword,
> it seems it was just supported[1] by the Calcite community last month and
> will be
> released in the next version (v1.37). The Calcite community is preparing
> the
> 1.37 release, so we can bump the version if needed in Flink 1.19.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/CALCITE-6254
>
> On Fri, 22 Mar 2024 at 21:46, Timo Walther <twal...@apache.org> wrote:
>
>> Hi everyone,
>>
>> this is a very important change to the Flink SQL syntax but we can't
>> wait until the SQL standard is ready for this. So I'm +1 on introducing
>> the MODEL concept as a first class citizen in Flink.
>>
>> For your information: Over the past months I have already spent a
>> significant amount of time thinking about how we can introduce PTFs in
>> Flink. I reserved FLIP-440[1] for this purpose and I will share a
>> version of this in the next 1-2 weeks.
>>
>> For a good implementation of FLIP-440 and also FLIP-437, we should
>> evolve the PTF syntax in collaboration with Apache Calcite.
>>
>> There are different syntax versions out there:
>>
>> 1) Flink
>>
>> SELECT * FROM
>>    TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
>>
>> 2) SQL standard
>>
>> SELECT * FROM
>>    TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
>>
>> 3) Oracle
>>
>> SELECT * FROM
>>     TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));
>>
>> As you can see above, Flink does not follow the standard correctly as it
>> would need to use `TABLE()` but this is not provided by Calcite yet.
>>
>> I really like the Oracle syntax[2][3] a lot. It reduces necessary
>> keywords to a minimum. Personally, I would like to discuss this syntax
>> in a separate FLIP and hope I will find supporters for:
>>
>>
>> SELECT * FROM
>>    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);
>>
>> If we go entirely with the Oracle syntax, as you can see in the example,
>> Oracle allows for passing identifiers directly. This would solve our
>> problems for the MODEL as well:
>>
>> SELECT f1, f2, label FROM ML_PREDICT(
>>    data => `my_data`,
>>    model => `classifier_model`,
>>    input => DESCRIPTOR(f1, f2));
>>
>> Or we completely adopt the Oracle syntax:
>>
>> SELECT f1, f2, label FROM ML_PREDICT(
>>    data => `my_data`,
>>    model => `classifier_model`,
>>    input => COLUMNS(f1, f2));
>>
>>
>> What do you think?
>>
>> Happy to create a FLIP for just this syntax question and collaborate
>> with the Calcite community on this. Supporting the syntax of Oracle
>> shouldn't be too hard to convince at least as parser parameter.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
>> [2]
>>
>> https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072
>> [3] https://oracle-base.com/articles/18c/polymorphic-table-functions-18c
>>
>>
>>
>> On 20.03.24 17:22, Mingge Deng wrote:
>> > Thanks Jark for all the insightful comments.
>> >
>> > We have updated the proposal per our offline discussions:
>> > 1. Model will be treated as a new relation in FlinkSQL.
>> > 2. Include the common ML predict and evaluate functions into the open
>> > source flink to complete the user journey.
>> >      And we should be able to extend the calcite SqlTableFunction to
>> support
>> > these two ML functions.
>> >
>> > Best,
>> > Mingge
>> >
>> > On Mon, Mar 18, 2024 at 7:05 PM Jark Wu <imj...@gmail.com> wrote:
>> >
>> >> Hi Hao,
>> >>
>> >>> I meant how the table name
>> >> in window TVF gets translated to `SqlCallingBinding`. Probably we need
>> to
>> >> fetch the table definition from the catalog somewhere. Do we treat
>> those
>> >> window TVF specially in parser/planner so that catalog is looked up
>> when
>> >> they are seen?
>> >>
>> >> The table names are resolved and validated by Calcite SqlValidator.  We
>> >> don' need to fetch from catalog manually.
>> >> The specific checking logic of cumulate window happens in
>> >> SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
>> >> The return type of SqlCumulateTableFunction is defined in
>> >> #getRowTypeInference() method.
>> >> Both are public interfaces provided by Calcite and it seems it's not
>> >> specially handled in parser/planner.
>> >>
>> >> I didn't try that, but my gut feeling is that the framework is ready to
>> >> extend a customized TVF.
>> >>
>> >>> For what model is, I'm wondering if it has to be datatype or relation.
>> >> Can
>> >> it be another kind of citizen parallel to
>> datatype/relation/function/db?
>> >> Redshift also supports `show models` operation, so it seems it's
>> treated
>> >> specially as well?
>> >>
>> >> If it is an entity only used in catalog scope (e.g., show xxx, create
>> xxx,
>> >> drop xxx), it is fine to introduce it.
>> >> We have introduced such one before, called Module: "load module", "show
>> >> modules" [1].
>> >> But if we want to use Model in TVF parameters, it means it has to be a
>> >> relation or datatype, because
>> >> that is what it only accepts now.
>> >>
>> >> Thanks for sharing the reason of preferring TVF instead of Redshift
>> way. It
>> >> sounds reasonable to me.
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >>   [1]:
>> >>
>> >>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/
>> >>
>> >> On Fri, 15 Mar 2024 at 13:41, Hao Li <h...@confluent.io.invalid> wrote:
>> >>
>> >>> Hi Jark,
>> >>>
>> >>> Thanks for the pointer. Sorry for the confusion: I meant how the table
>> >> name
>> >>> in window TVF gets translated to `SqlCallingBinding`. Probably we
>> need to
>> >>> fetch the table definition from the catalog somewhere. Do we treat
>> those
>> >>> window TVF specially in parser/planner so that catalog is looked up
>> when
>> >>> they are seen?
>> >>>
>> >>> For what model is, I'm wondering if it has to be datatype or relation.
>> >> Can
>> >>> it be another kind of citizen parallel to
>> datatype/relation/function/db?
>> >>> Redshift also supports `show models` operation, so it seems it's
>> treated
>> >>> specially as well? The reasons I don't like Redshift's syntax are:
>> >>> 1. It's a bit verbose, you need to think of a model name as well as a
>> >>> function name and the function name also needs to be unique.
>> >>> 2. More importantly, prediction function isn't the only function that
>> can
>> >>> operate on models. There could be a set of inference functions [1] and
>> >>> evaluation functions [2] which can operate on models. It's hard to
>> >> specify
>> >>> all of them in model creation.
>> >>>
>> >>> [1]:
>> >>>
>> >>>
>> >>
>> https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-predict
>> >>> [2]:
>> >>>
>> >>>
>> >>
>> https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-evaluate
>> >>>
>> >>> Thanks,
>> >>> Hao
>> >>>
>> >>> On Thu, Mar 14, 2024 at 8:18 PM Jark Wu <imj...@gmail.com> wrote:
>> >>>
>> >>>> Hi Hao,
>> >>>>
>> >>>>> Can you send me some pointers
>> >>>> where the function gets the table information?
>> >>>>
>> >>>> Here is the code of cumulate window type checking [1].
>> >>>>
>> >>>>> Also is it possible to support <query_stmt> in
>> >>>> window functions in addiction to table?
>> >>>>
>> >>>> Yes. It is not allowed in TVF.
>> >>>>
>> >>>> Thanks for the syntax links of other systems. The reason I prefer the
>> >>>> Redshift way is
>> >>>> that it avoids introducing Model as a relation or datatype
>> (referenced
>> >>> as a
>> >>>> parameter in TVF).
>> >>>> Model is not a relation because it can be queried directly (e.g.,
>> >> SELECT
>> >>> *
>> >>>> FROM model).
>> >>>> I'm also confused about making Model as a datatype, because I don't
>> >> know
>> >>>> what class the
>> >>>> model parameter of the eval method of TableFunction/ScalarFunction
>> >> should
>> >>>> be. By defining
>> >>>> the function with the model, users can directly invoke the function
>> >>> without
>> >>>> reference to the model name.
>> >>>>
>> >>>> Best,
>> >>>> Jark
>> >>>>
>> >>>> [1]:
>> >>>>
>> >>>>
>> >>>
>> >>
>> https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java#L53
>> >>>>
>> >>>> On Fri, 15 Mar 2024 at 02:48, Hao Li <h...@confluent.io.invalid>
>> wrote:
>> >>>>
>> >>>>> Hi Jark,
>> >>>>>
>> >>>>> Thanks for the pointers. It's very helpful.
>> >>>>>
>> >>>>> 1. Looks like `tumble`, `hopping` are keywords in calcite parser.
>> And
>> >>> the
>> >>>>> syntax `cumulate(Table my_table, ...)` needs to get table
>> information
>> >>>> from
>> >>>>> catalog somewhere for type validation etc. Can you send me some
>> >>> pointers
>> >>>>> where the function gets the table information?
>> >>>>> 2. The ideal syntax for model function I think would be
>> >>> `ML_PREDICT(MODEL
>> >>>>> <model_name>, {table <table_name> | (query_stmt) })`. I think with
>> >>>> special
>> >>>>> handling of the `ML_PREDICT` function in parser/planner, maybe we
>> can
>> >>> do
>> >>>>> this like window functions. But to support `MODEL` keyword, we need
>> >>>> calcite
>> >>>>> parser change I guess. Also is it possible to support <query_stmt>
>> in
>> >>>>> window functions in addiction to table?
>> >>>>>
>> >>>>> For the redshift syntax, I'm not sure the purpose of defining the
>> >>>> function
>> >>>>> name with the model. Is it to define the function input/output
>> >> schema?
>> >>> We
>> >>>>> have the schema in our create model syntax and the `ML_PREDICT` can
>> >>>> handle
>> >>>>> it by getting model definition. I think our syntax is more concise
>> to
>> >>>> have
>> >>>>> a generic prediction function. I also did some research and it's the
>> >>>> syntax
>> >>>>> used by Databricks `ai_query` [1], Snowflake `predict` [2], Azureml
>> >>>>> `predict` [3].
>> >>>>>
>> >>>>> [1]:
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://docs.databricks.com/en/sql/language-manual/functions/ai_query.html
>> >>>>> [2]:
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_inference.ipynb?_fsi=sksXUwQ0
>> >>>>> [3]:
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://learn.microsoft.com/en-us/sql/machine-learning/tutorials/quickstart-python-train-score-model?view=azuresqldb-mi-current
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Hao
>> >>>>>
>> >>>>> On Wed, Mar 13, 2024 at 8:57 PM Jark Wu <imj...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hi Mingge, Hao,
>> >>>>>>
>> >>>>>> Thanks for your replies.
>> >>>>>>
>> >>>>>>> PTF is actually the ideal approach for model functions, and we do
>> >>>> have
>> >>>>>> the plans to use PTF for
>> >>>>>> all model functions (including prediction, evaluation etc..) once
>> >> the
>> >>>> PTF
>> >>>>>> is supported in FlinkSQL
>> >>>>>> confluent extension.
>> >>>>>>
>> >>>>>> It sounds that PTF is the ideal way and table function is a
>> >> temporary
>> >>>>>> solution which will be dropped in the future.
>> >>>>>> I'm not sure whether we can implement it using PTF in Flink SQL.
>> >> But
>> >>> we
>> >>>>>> have implemented window
>> >>>>>> functions using PTF[1]. And introduced a new window function
>> >> (called
>> >>>>>> CUMULATE[2]) in Flink SQL based
>> >>>>>> on this. I think it might work to use PTF and implement model
>> >>> function
>> >>>>>> syntax like this:
>> >>>>>>
>> >>>>>> SELECT * FROM TABLE(ML_PREDICT(
>> >>>>>>    TABLE my_table,
>> >>>>>>    my_model,
>> >>>>>>    col1,
>> >>>>>>    col2
>> >>>>>> ));
>> >>>>>>
>> >>>>>> Besides, did you consider following the way of AWS Redshift which
>> >>>> defines
>> >>>>>> model function with the model itself together?
>> >>>>>> IIUC, a model is a black-box which defines input parameters and
>> >>> output
>> >>>>>> parameters which can be modeled into functions.
>> >>>>>>
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Jark
>> >>>>>>
>> >>>>>> [1]:
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#session
>> >>>>>> [2]:
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows
>> >>>>>> [3]:
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://github.com/aws-samples/amazon-redshift-ml-getting-started/blob/main/use-cases/bring-your-own-model-remote-inference/README.md#create-model
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Wed, 13 Mar 2024 at 15:00, Hao Li <h...@confluent.io.invalid>
>> >>> wrote:
>> >>>>>>
>> >>>>>>> Hi Jark,
>> >>>>>>>
>> >>>>>>> Thanks for your questions. These are good questions!
>> >>>>>>>
>> >>>>>>> 1. The polymorphism table function I was referring to takes a
>> >> table
>> >>>> as
>> >>>>>>> input and outputs a table. So the syntax would be like
>> >>>>>>> ```
>> >>>>>>> SELECT * FROM ML_PREDICT('model', (SELECT * FROM my_table))
>> >>>>>>> ```
>> >>>>>>> As far as I know, this is not supported yet on Flink. So before
>> >>> it's
>> >>>>>>> supported, one option for the predict function is using table
>> >>>> function
>> >>>>>>> which can output multiple columns
>> >>>>>>> ```
>> >>>>>>> SELECT * FROM my_table, LATERAL VIEW (ML_PREDICT('model', col1,
>> >>>> col2))
>> >>>>>>> ```
>> >>>>>>>
>> >>>>>>> 2. Good question. Type inference is hard for the `ML_PREDICT`
>> >>>> function
>> >>>>>>> because it takes a model name string as input. I can think of
>> >> three
>> >>>>> ways
>> >>>>>> of
>> >>>>>>> doing type inference for it.
>> >>>>>>>     1). Treat `ML_PREDICT` function as something special and
>> >> during
>> >>>> sql
>> >>>>>>> parsing or planning time, if it's encountered, we need to look up
>> >>> the
>> >>>>>> model
>> >>>>>>> from the first argument which is a model name from catalog. Then
>> >> we
>> >>>> can
>> >>>>>>> infer the input/output for the function.
>> >>>>>>>     2). We can define a `model` keyword and use that in the
>> >> predict
>> >>>>>> function
>> >>>>>>> to indicate the argument refers to a model. So it's like
>> >>>>>> `ML_PREDICT(model
>> >>>>>>> 'my_model', col1, col2))`
>> >>>>>>>     3). We can create a special type of table function maybe
>> >> called
>> >>>>>>> `ModelFunction` which can resolve the model type inference by
>> >>> special
>> >>>>>>> handling it during parsing or planning time.
>> >>>>>>> 1) is hacky, 2) isn't supported in Flink for function, 3) might
>> >> be
>> >>> a
>> >>>>>>> good option.
>> >>>>>>>
>> >>>>>>> 3. I sketched the `ML_PREDICT` function for inference. But there
>> >>> are
>> >>>>>>> limitations of the function mentioned in 1 and 2. So maybe we
>> >> don't
>> >>>>> need
>> >>>>>> to
>> >>>>>>> introduce them as built-in functions until polymorphism table
>> >>>> function
>> >>>>>> and
>> >>>>>>> we can properly deal with type inference.
>> >>>>>>> After that, defining a user-defined model function should also be
>> >>>>>>> straightforward.
>> >>>>>>>
>> >>>>>>> 4. For model types, do you mean 'remote', 'import', 'native'
>> >> models
>> >>>> or
>> >>>>>>> other things?
>> >>>>>>>
>> >>>>>>> 5. We could support popular providers such as 'azureml',
>> >>> 'vertexai',
>> >>>>>>> 'googleai' as long as we support the `ML_PREDICT` function. Users
>> >>>>> should
>> >>>>>> be
>> >>>>>>> able to implement 3rd-party providers if they can implement a
>> >>>> function
>> >>>>>>> handling the input/output for the provider.
>> >>>>>>>
>> >>>>>>> I think for the model functions, there are still dependencies or
>> >>>> hacks
>> >>>>> we
>> >>>>>>> need to sort out as a built-in function. Maybe we can separate
>> >> that
>> >>>> as
>> >>>>> a
>> >>>>>>> follow up if we want to have it built-in and focus on the model
>> >>>> syntax
>> >>>>>> for
>> >>>>>>> this FLIP?
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Hao
>> >>>>>>>
>> >>>>>>> On Tue, Mar 12, 2024 at 10:33 PM Jark Wu <imj...@gmail.com>
>> >> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Minge, Chris, Hao,
>> >>>>>>>>
>> >>>>>>>> Thanks for proposing this interesting idea. I think this is a
>> >>> nice
>> >>>>> step
>> >>>>>>>> towards
>> >>>>>>>> the AI world for Apache Flink. I don't know much about AI/ML,
>> >> so
>> >>> I
>> >>>>> may
>> >>>>>>> have
>> >>>>>>>> some stupid questions.
>> >>>>>>>>
>> >>>>>>>> 1. Could you tell more about why polymorphism table function
>> >>> (PTF)
>> >>>>>>> doesn't
>> >>>>>>>> work and do we have plan to use PTF as model functions?
>> >>>>>>>>
>> >>>>>>>> 2. What kind of object does the model map to in SQL? A relation
>> >>> or
>> >>>> a
>> >>>>>> data
>> >>>>>>>> type?
>> >>>>>>>> It looks like a data type because we use it as a parameter of
>> >> the
>> >>>>> table
>> >>>>>>>> function.
>> >>>>>>>> If it is a data type, how does it cooperate with type
>> >>> inference[1]?
>> >>>>>>>>
>> >>>>>>>> 3. What built-in model functions will we support? How to
>> >> define a
>> >>>>>>>> user-defined model function?
>> >>>>>>>>
>> >>>>>>>> 4. What built-in model types will we support? How to define a
>> >>>>>>> user-defined
>> >>>>>>>> model type?
>> >>>>>>>>
>> >>>>>>>> 5. Regarding the remote model, what providers will we support?
>> >>> Can
>> >>>>>> users
>> >>>>>>>> implement
>> >>>>>>>> 3rd-party providers except OpenAI?
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Jark
>> >>>>>>>>
>> >>>>>>>> [1]:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Wed, 13 Mar 2024 at 05:55, Hao Li <h...@confluent.io.invalid
>> >>>
>> >>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi, Dev
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Mingge, Chris and I would like to start a discussion about
>> >>>>> FLIP-437:
>> >>>>>>>>> Support ML Models in Flink SQL.
>> >>>>>>>>>
>> >>>>>>>>> This FLIP is proposing to support machine learning models in
>> >>>> Flink
>> >>>>>> SQL
>> >>>>>>>>> syntax so that users can CRUD models with Flink SQL and use
>> >>>> models
>> >>>>> on
>> >>>>>>>> Flink
>> >>>>>>>>> to do prediction with Flink data. The FLIP also proposes new
>> >>>> model
>> >>>>>>>> entities
>> >>>>>>>>> and changes to catalog interface to support model CRUD
>> >>> operations
>> >>>>> in
>> >>>>>>>>> catalog.
>> >>>>>>>>>
>> >>>>>>>>> For more details, see FLIP-437 [1]. Looking forward to your
>> >>>>> feedback.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> [1]
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Minge, Chris & Hao
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>>

Reply via email to