I would recommend to use the old UDF stack for now. You can simply call `StreamTableEnvironment.registerFunction` instead of `createTemporarySystemFunction`. Then the UDF returns a legacy type that the DataStream API understands.

Have you thought about implementing your own catalog instead of generating CREATE TABLE statements? The catalog API seems a bit complex at first glance but only requires to implement a couple of methods. In this case you can implement your own `CatalogTable` which is the parsed representation of a `CREATE TABLE` statement. In this case you would have the full control over the entire type stack end-to-end.

Regards,
Timo

On 28.12.20 10:36, Yuval Itzchakov wrote:
Timo, an additional question.

I am currently using TypeConversions.fromLegacyInfoToDataType. However, this still does not allow me to register this table with the catalog since any representation of LEGACY isn't supported (I don't see it generating any other DataType other than RAW(.., LEGACY). Is there any way I can preserve the underlying type and still register the column somehow with CREATE TABLE?

On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <[email protected] <mailto:[email protected]>> wrote:

    Hi Timo,

    Thanks for the explanation. Passing around a byte array is not
    possible since I need to know the concrete type later for
    serialization in my sink, so I need to keep the type.

    What I'm trying to achieve is the following:
    I have a scalar UDF function:

    image.png

    This function is later used in processing of a Flink table, by
    calling PARSE_JSON on the selected field.
    Whoever uses these UDFs isn't aware of any Flink syntax for creating
    and registering tables, I generate the CREATE TABLE statement behind
    the scenes, dynamically.
    for each table. In order for this to work, I need the UDF to output
    the correct type (in this example, io.circe.Json). Later, this JSON
    type (or any other type returned by the UDF for that matter) will
    get serialized in a custom sink and into a data warehouse (that's
    why I need to keep the concrete type, since serialization happens at
    the edges before writing it out).

    The reason I'm converting between Table and DataStream is that this
    table will be further manipulated before being written to the custom
    DynamicTableSink by applying transformations on a DataStream[Row],
    and then converted back to a Table to be used in an additional
    CREATE TABLE and then INSERT INTO statement.

    This is why I have these conversions back and forth, and why I
    somehow need a way to register the legacy types as a valid type of
    the table.
    Hope that clarifies a bit, since the pipeline is rather complex I
    can't really share a MVCE of it.

    On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Yuval,

        the legacy type has no string representation that can be used in
        a SQL
        DDL statement. The current string representation LEGACY(...) is
        only a
        temporary work around to persist the old types in catalogs.

        Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
        support only legacy type info. So I would recommend to use the
        legacy
        type in the UDF return type as well. Either you use the old
        `getResultType` method or you override `getTypeInference` and call
        `TypeConversions.fromLegacyInfoToDataType`.

        Another work around could be that you simply use `BYTES` as the
        return
        type and pass around a byte array instead.

        Maybe you can show us a little end-to-end example, what you are
        trying
        to achieve?

        Regards,
        Timo


        On 28.12.20 07:47, Yuval Itzchakov wrote:
         > Hi Danny,
         >
         > Yes, I tried implementing the DataTypeFactory for the UDF using
         > TypeInformationRawType (which is deprecated BTW, and there's
        no support
         > for RawType in the conversion), didn't help.
         >
         > I did manage to get the conversion working using
         > TableEnvironment.toAppendStream (I was previously directly
        calling
         > TypeConversions) but still remains the problem that Flink
        can't register
         > LEGACY types via the CREATE TABLE DDL
         >
         > On Mon, Dec 28, 2020, 04:25 Danny Chan <[email protected]
        <mailto:[email protected]>
         > <mailto:[email protected] <mailto:[email protected]>>>
        wrote:
         >
         >      > SQL parse failed. Encount
         >     What syntax did you use ?
         >
         >      > TypeConversions.fromDataTypeToLegacyInfo cannot
        convert a plain
         >     RAW type back to TypeInformation.
         >
         >     Did you try to construct type information by a new
         >     fresh TypeInformationRawType ?
         >
         >     Yuval Itzchakov <[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>> 于
         >     2020年12月24日周四 下午7:24写道:
         >
         >         An expansion to my question:
         >
         >         What I really want is for the UDF to return
        `RAW(io.circe.Json,
         >         ?)` type, but I have to do a conversion between Table and
         >         DataStream, and
        TypeConversions.fromDataTypeToLegacyInfo cannot
         >         convert a plain RAW type back to TypeInformation.
         >
         >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
         >         <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> wrote:
         >
         >             Hi,
         >
         >             I have a UDF which returns a type of MAP<STRING,
         >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try
        to register
         >             this type with Flink via the CREATE TABLE DDL, I
        encounter
         >             an exception:
         >
         >             - SQL parse failed. Encountered "(" at line 2,
        column 256.
         >             Was expecting one of:
         >                  "NOT" ...
         >                  "NULL" ...
         >                  ">" ...
         >                  "MULTISET" ...
         >                  "ARRAY" ...
         >                  "." ...
         >
         >             Which looks like the planner doesn't like the
        round brackets
         >             on the LEGACY type. What is the correct way to
        register the
         >             table with this type with Flink?
         >             --
         >             Best Regards,
         >             Yuval Itzchakov.
         >
         >
         >
         >         --
         >         Best Regards,
         >         Yuval Itzchakov.
         >



-- Best Regards,
    Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.

Reply via email to