Hi Yuval,

FLIP-136 will resolve the remaining legacy type issues for DataStream API. After that we can drop legacy types because everything can be expressed with DataType. DataType is a super type of TypeInformation.

Registering types (e.g. giving a RAW type backed by io.circe.Json a name such that it can be used in a DDL statement) is also in the backlog but requires another FLIP.

Regards,
Timo


On 28.12.20 13:04, Yuval Itzchakov wrote:
Hi Timo,
I will look at the Catalog interface and see what it requires.

For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to resolve the issue around legacy types? Will it's implementation allow to register LEGACY types? or a new variation of them?

On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <[email protected] <mailto:[email protected]>> wrote:

    I would recommend to use the old UDF stack for now. You can simply call
    `StreamTableEnvironment.registerFunction` instead of
    `createTemporarySystemFunction`. Then the UDF returns a legacy type
    that
    the DataStream API understands.

    Have you thought about implementing your own catalog instead of
    generating CREATE TABLE statements? The catalog API seems a bit complex
    at first glance but only requires to implement a couple of methods. In
    this case you can implement your own `CatalogTable` which is the parsed
    representation of a `CREATE TABLE` statement. In this case you would
    have the full control over the entire type stack end-to-end.

    Regards,
    Timo

    On 28.12.20 10:36, Yuval Itzchakov wrote:
     > Timo, an additional question.
     >
     > I am currently using TypeConversions.fromLegacyInfoToDataType.
    However,
     > this still does not allow me to register this table with the catalog
     > since any representation of LEGACY isn't supported (I don't see it
     > generating any other DataType other than RAW(.., LEGACY). Is
    there any
     > way I can preserve the underlying type and still register the column
     > somehow with CREATE TABLE?
     >
     > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov
    <[email protected] <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >     Hi Timo,
     >
     >     Thanks for the explanation. Passing around a byte array is not
     >     possible since I need to know the concrete type later for
     >     serialization in my sink, so I need to keep the type.
     >
     >     What I'm trying to achieve is the following:
     >     I have a scalar UDF function:
     >
     >     image.png
     >
     >     This function is later used in processing of a Flink table, by
     >     calling PARSE_JSON on the selected field.
     >     Whoever uses these UDFs isn't aware of any Flink syntax for
    creating
     >     and registering tables, I generate the CREATE TABLE statement
    behind
     >     the scenes, dynamically.
     >     for each table. In order for this to work, I need the UDF to
    output
     >     the correct type (in this example, io.circe.Json). Later,
    this JSON
     >     type (or any other type returned by the UDF for that matter) will
     >     get serialized in a custom sink and into a data warehouse (that's
     >     why I need to keep the concrete type, since serialization
    happens at
     >     the edges before writing it out).
     >
     >     The reason I'm converting between Table and DataStream is
    that this
     >     table will be further manipulated before being written to the
    custom
     >     DynamicTableSink by applying transformations on a
    DataStream[Row],
     >     and then converted back to a Table to be used in an additional
     >     CREATE TABLE and then INSERT INTO statement.
     >
     >     This is why I have these conversions back and forth, and why I
     >     somehow need a way to register the legacy types as a valid
    type of
     >     the table.
     >     Hope that clarifies a bit, since the pipeline is rather complex I
     >     can't really share a MVCE of it.
     >
     >     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther
    <[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >         Hi Yuval,
     >
     >         the legacy type has no string representation that can be
    used in
     >         a SQL
     >         DDL statement. The current string representation
    LEGACY(...) is
     >         only a
     >         temporary work around to persist the old types in catalogs.
     >
     >         Until FLIP-136 is fully implemented,
    toAppendStream/toRetractStream
     >         support only legacy type info. So I would recommend to
    use the
     >         legacy
     >         type in the UDF return type as well. Either you use the old
     >         `getResultType` method or you override `getTypeInference`
    and call
     >         `TypeConversions.fromLegacyInfoToDataType`.
     >
     >         Another work around could be that you simply use `BYTES`
    as the
     >         return
     >         type and pass around a byte array instead.
     >
     >         Maybe you can show us a little end-to-end example, what
    you are
     >         trying
     >         to achieve?
     >
     >         Regards,
     >         Timo
     >
     >
     >         On 28.12.20 07:47, Yuval Itzchakov wrote:
     >          > Hi Danny,
     >          >
     >          > Yes, I tried implementing the DataTypeFactory for the
    UDF using
     >          > TypeInformationRawType (which is deprecated BTW, and
    there's
     >         no support
     >          > for RawType in the conversion), didn't help.
     >          >
     >          > I did manage to get the conversion working using
     >          > TableEnvironment.toAppendStream (I was previously directly
     >         calling
     >          > TypeConversions) but still remains the problem that Flink
     >         can't register
     >          > LEGACY types via the CREATE TABLE DDL
     >          >
     >          > On Mon, Dec 28, 2020, 04:25 Danny Chan
    <[email protected] <mailto:[email protected]>
     >         <mailto:[email protected] <mailto:[email protected]>>
     >          > <mailto:[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>>>
     >         wrote:
     >          >
     >          >      > SQL parse failed. Encount
     >          >     What syntax did you use ?
     >          >
     >          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
     >         convert a plain
     >          >     RAW type back to TypeInformation.
     >          >
     >          >     Did you try to construct type information by a new
     >          >     fresh TypeInformationRawType ?
     >          >
     >          >     Yuval Itzchakov <[email protected]
    <mailto:[email protected]>
     >         <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
     >         <mailto:[email protected] <mailto:[email protected]>>>> 于
     >          >     2020年12月24日周四 下午7:24写道:
     >          >
     >          >         An expansion to my question:
     >          >
     >          >         What I really want is for the UDF to return
     >         `RAW(io.circe.Json,
     >          >         ?)` type, but I have to do a conversion
    between Table and
     >          >         DataStream, and
     >         TypeConversions.fromDataTypeToLegacyInfo cannot
     >          >         convert a plain RAW type back to TypeInformation.
     >          >
     >          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
     >          >         <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >         <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >          >
     >          >             Hi,
     >          >
     >          >             I have a UDF which returns a type
    of MAP<STRING,
     >          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When
    I try
     >         to register
     >          >             this type with Flink via the CREATE TABLE
    DDL, I
     >         encounter
     >          >             an exception:
     >          >
     >          >             - SQL parse failed. Encountered "(" at line 2,
     >         column 256.
     >          >             Was expecting one of:
     >          >                  "NOT" ...
     >          >                  "NULL" ...
     >          >                  ">" ...
     >          >                  "MULTISET" ...
     >          >                  "ARRAY" ...
     >          >                  "." ...
     >          >
     >          >             Which looks like the planner doesn't like the
     >         round brackets
     >          >             on the LEGACY type. What is the correct way to
     >         register the
     >          >             table with this type with Flink?
     >          >             --
     >          >             Best Regards,
     >          >             Yuval Itzchakov.
     >          >
     >          >
     >          >
     >          >         --
     >          >         Best Regards,
     >          >         Yuval Itzchakov.
     >          >
     >
     >
     >
     >     --
     >     Best Regards,
     >     Yuval Itzchakov.
     >
     >
     >
     > --
     > Best Regards,
     > Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.

Reply via email to