Hi Yuval,
FLIP-136 will resolve the remaining legacy type issues for DataStream
API. After that we can drop legacy types because everything can be
expressed with DataType. DataType is a super type of TypeInformation.
Registering types (e.g. giving a RAW type backed by io.circe.Json a name
such that it can be used in a DDL statement) is also in the backlog but
requires another FLIP.
Regards,
Timo
On 28.12.20 13:04, Yuval Itzchakov wrote:
Hi Timo,
I will look at the Catalog interface and see what it requires.
For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to
resolve the issue around legacy types? Will it's implementation allow to
register LEGACY types? or a new variation of them?
On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <[email protected]
<mailto:[email protected]>> wrote:
I would recommend to use the old UDF stack for now. You can simply call
`StreamTableEnvironment.registerFunction` instead of
`createTemporarySystemFunction`. Then the UDF returns a legacy type
that
the DataStream API understands.
Have you thought about implementing your own catalog instead of
generating CREATE TABLE statements? The catalog API seems a bit complex
at first glance but only requires to implement a couple of methods. In
this case you can implement your own `CatalogTable` which is the parsed
representation of a `CREATE TABLE` statement. In this case you would
have the full control over the entire type stack end-to-end.
Regards,
Timo
On 28.12.20 10:36, Yuval Itzchakov wrote:
> Timo, an additional question.
>
> I am currently using TypeConversions.fromLegacyInfoToDataType.
However,
> this still does not allow me to register this table with the catalog
> since any representation of LEGACY isn't supported (I don't see it
> generating any other DataType other than RAW(.., LEGACY). Is
there any
> way I can preserve the underlying type and still register the column
> somehow with CREATE TABLE?
>
> On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Timo,
>
> Thanks for the explanation. Passing around a byte array is not
> possible since I need to know the concrete type later for
> serialization in my sink, so I need to keep the type.
>
> What I'm trying to achieve is the following:
> I have a scalar UDF function:
>
> image.png
>
> This function is later used in processing of a Flink table, by
> calling PARSE_JSON on the selected field.
> Whoever uses these UDFs isn't aware of any Flink syntax for
creating
> and registering tables, I generate the CREATE TABLE statement
behind
> the scenes, dynamically.
> for each table. In order for this to work, I need the UDF to
output
> the correct type (in this example, io.circe.Json). Later,
this JSON
> type (or any other type returned by the UDF for that matter) will
> get serialized in a custom sink and into a data warehouse (that's
> why I need to keep the concrete type, since serialization
happens at
> the edges before writing it out).
>
> The reason I'm converting between Table and DataStream is
that this
> table will be further manipulated before being written to the
custom
> DynamicTableSink by applying transformations on a
DataStream[Row],
> and then converted back to a Table to be used in an additional
> CREATE TABLE and then INSERT INTO statement.
>
> This is why I have these conversions back and forth, and why I
> somehow need a way to register the legacy types as a valid
type of
> the table.
> Hope that clarifies a bit, since the pipeline is rather complex I
> can't really share a MVCE of it.
>
> On Mon, Dec 28, 2020 at 11:08 AM Timo Walther
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Yuval,
>
> the legacy type has no string representation that can be
used in
> a SQL
> DDL statement. The current string representation
LEGACY(...) is
> only a
> temporary work around to persist the old types in catalogs.
>
> Until FLIP-136 is fully implemented,
toAppendStream/toRetractStream
> support only legacy type info. So I would recommend to
use the
> legacy
> type in the UDF return type as well. Either you use the old
> `getResultType` method or you override `getTypeInference`
and call
> `TypeConversions.fromLegacyInfoToDataType`.
>
> Another work around could be that you simply use `BYTES`
as the
> return
> type and pass around a byte array instead.
>
> Maybe you can show us a little end-to-end example, what
you are
> trying
> to achieve?
>
> Regards,
> Timo
>
>
> On 28.12.20 07:47, Yuval Itzchakov wrote:
> > Hi Danny,
> >
> > Yes, I tried implementing the DataTypeFactory for the
UDF using
> > TypeInformationRawType (which is deprecated BTW, and
there's
> no support
> > for RawType in the conversion), didn't help.
> >
> > I did manage to get the conversion working using
> > TableEnvironment.toAppendStream (I was previously directly
> calling
> > TypeConversions) but still remains the problem that Flink
> can't register
> > LEGACY types via the CREATE TABLE DDL
> >
> > On Mon, Dec 28, 2020, 04:25 Danny Chan
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>>>
> wrote:
> >
> > > SQL parse failed. Encount
> > What syntax did you use ?
> >
> > > TypeConversions.fromDataTypeToLegacyInfo cannot
> convert a plain
> > RAW type back to TypeInformation.
> >
> > Did you try to construct type information by a new
> > fresh TypeInformationRawType ?
> >
> > Yuval Itzchakov <[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>> 于
> > 2020年12月24日周四 下午7:24写道:
> >
> > An expansion to my question:
> >
> > What I really want is for the UDF to return
> `RAW(io.circe.Json,
> > ?)` type, but I have to do a conversion
between Table and
> > DataStream, and
> TypeConversions.fromDataTypeToLegacyInfo cannot
> > convert a plain RAW type back to TypeInformation.
> >
> > On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >
> > Hi,
> >
> > I have a UDF which returns a type
of MAP<STRING,
> > LEGACY('RAW', 'ANY<io.circe.Json>')>. When
I try
> to register
> > this type with Flink via the CREATE TABLE
DDL, I
> encounter
> > an exception:
> >
> > - SQL parse failed. Encountered "(" at line 2,
> column 256.
> > Was expecting one of:
> > "NOT" ...
> > "NULL" ...
> > ">" ...
> > "MULTISET" ...
> > "ARRAY" ...
> > "." ...
> >
> > Which looks like the planner doesn't like the
> round brackets
> > on the LEGACY type. What is the correct way to
> register the
> > table with this type with Flink?
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
--
Best Regards,
Yuval Itzchakov.