Hi Timo, I will look at the Catalog interface and see what it requires. For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to resolve the issue around legacy types? Will it's implementation allow to register LEGACY types? or a new variation of them?
On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <[email protected]> wrote: > I would recommend to use the old UDF stack for now. You can simply call > `StreamTableEnvironment.registerFunction` instead of > `createTemporarySystemFunction`. Then the UDF returns a legacy type that > the DataStream API understands. > > Have you thought about implementing your own catalog instead of > generating CREATE TABLE statements? The catalog API seems a bit complex > at first glance but only requires to implement a couple of methods. In > this case you can implement your own `CatalogTable` which is the parsed > representation of a `CREATE TABLE` statement. In this case you would > have the full control over the entire type stack end-to-end. > > Regards, > Timo > > On 28.12.20 10:36, Yuval Itzchakov wrote: > > Timo, an additional question. > > > > I am currently using TypeConversions.fromLegacyInfoToDataType. However, > > this still does not allow me to register this table with the catalog > > since any representation of LEGACY isn't supported (I don't see it > > generating any other DataType other than RAW(.., LEGACY). Is there any > > way I can preserve the underlying type and still register the column > > somehow with CREATE TABLE? > > > > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Timo, > > > > Thanks for the explanation. Passing around a byte array is not > > possible since I need to know the concrete type later for > > serialization in my sink, so I need to keep the type. > > > > What I'm trying to achieve is the following: > > I have a scalar UDF function: > > > > image.png > > > > This function is later used in processing of a Flink table, by > > calling PARSE_JSON on the selected field. > > Whoever uses these UDFs isn't aware of any Flink syntax for creating > > and registering tables, I generate the CREATE TABLE statement behind > > the scenes, dynamically. > > for each table. In order for this to work, I need the UDF to output > > the correct type (in this example, io.circe.Json). Later, this JSON > > type (or any other type returned by the UDF for that matter) will > > get serialized in a custom sink and into a data warehouse (that's > > why I need to keep the concrete type, since serialization happens at > > the edges before writing it out). > > > > The reason I'm converting between Table and DataStream is that this > > table will be further manipulated before being written to the custom > > DynamicTableSink by applying transformations on a DataStream[Row], > > and then converted back to a Table to be used in an additional > > CREATE TABLE and then INSERT INTO statement. > > > > This is why I have these conversions back and forth, and why I > > somehow need a way to register the legacy types as a valid type of > > the table. > > Hope that clarifies a bit, since the pipeline is rather complex I > > can't really share a MVCE of it. > > > > On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Yuval, > > > > the legacy type has no string representation that can be used in > > a SQL > > DDL statement. The current string representation LEGACY(...) is > > only a > > temporary work around to persist the old types in catalogs. > > > > Until FLIP-136 is fully implemented, > toAppendStream/toRetractStream > > support only legacy type info. So I would recommend to use the > > legacy > > type in the UDF return type as well. Either you use the old > > `getResultType` method or you override `getTypeInference` and > call > > `TypeConversions.fromLegacyInfoToDataType`. > > > > Another work around could be that you simply use `BYTES` as the > > return > > type and pass around a byte array instead. > > > > Maybe you can show us a little end-to-end example, what you are > > trying > > to achieve? > > > > Regards, > > Timo > > > > > > On 28.12.20 07:47, Yuval Itzchakov wrote: > > > Hi Danny, > > > > > > Yes, I tried implementing the DataTypeFactory for the UDF > using > > > TypeInformationRawType (which is deprecated BTW, and there's > > no support > > > for RawType in the conversion), didn't help. > > > > > > I did manage to get the conversion working using > > > TableEnvironment.toAppendStream (I was previously directly > > calling > > > TypeConversions) but still remains the problem that Flink > > can't register > > > LEGACY types via the CREATE TABLE DDL > > > > > > On Mon, Dec 28, 2020, 04:25 Danny Chan <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> > > wrote: > > > > > > > SQL parse failed. Encount > > > What syntax did you use ? > > > > > > > TypeConversions.fromDataTypeToLegacyInfo cannot > > convert a plain > > > RAW type back to TypeInformation. > > > > > > Did you try to construct type information by a new > > > fresh TypeInformationRawType ? > > > > > > Yuval Itzchakov <[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>>> 于 > > > 2020年12月24日周四 下午7:24写道: > > > > > > An expansion to my question: > > > > > > What I really want is for the UDF to return > > `RAW(io.circe.Json, > > > ?)` type, but I have to do a conversion between Table > and > > > DataStream, and > > TypeConversions.fromDataTypeToLegacyInfo cannot > > > convert a plain RAW type back to TypeInformation. > > > > > > On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > > > Hi, > > > > > > I have a UDF which returns a type of MAP<STRING, > > > LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try > > to register > > > this type with Flink via the CREATE TABLE DDL, I > > encounter > > > an exception: > > > > > > - SQL parse failed. Encountered "(" at line 2, > > column 256. > > > Was expecting one of: > > > "NOT" ... > > > "NULL" ... > > > ">" ... > > > "MULTISET" ... > > > "ARRAY" ... > > > "." ... > > > > > > Which looks like the planner doesn't like the > > round brackets > > > on the LEGACY type. What is the correct way to > > register the > > > table with this type with Flink? > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > > > > > > > > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > -- Best Regards, Yuval Itzchakov.
