FYI.
I've managed to fix that by switching to using `toDataStream`. It seems to
be working fine now.
I have created the issue about the UDF though, since it seems to be
different issue. Not sure if an issue should be created for
`toAppendStream` if this is meant to be deprecated.

pon., 9 sie 2021 o 19:33 Timo Walther <twal...@apache.org> napisał(a):

> Sorry, I meant "will be deprecated in Flink 1.14"
>
> On 09.08.21 19:32, Timo Walther wrote:
> > Hi Dominik,
> >
> > `toAppendStream` is soft deprecated in Flink 1.13 and will be deprecated
> > in Flink 1.13. It uses the old type system and might not match perfectly
> > with the other reworked type system in new functions and sources.
> >
> > For SQL, a lot of Avro classes need to be treated as RAW types. But we
> > might address this issue soon and further improve Avro support.
> >
> > I would suggest to continue this discussion in a JIRA issue. Can you
> > also share the code for `NewEvent` and te Avro schema or generated Avro
> > class for `Event` for to have a fully reproducible example?
> >
> > What can help is to explicitly define the types:
> >
> > E.g. you can also use `DataTypes.of(TypeInformation)` both in
> > `ScalarFunction.getTypeInference` and
> > `StreamTableEnvironment.toDataStream()`.
> >
> > I hope this helps.
> >
> > Timo
> >
> > On 09.08.21 16:27, Dominik Wosiński wrote:
> >> It should be `id` instead of `licence` in the error, I've copy-pasted it
> >> incorrectly :<
> >>
> >> I've also tried additional thing, i.e. creating the ScalarFunction that
> >> does mapping of one avro generated enum to additional avro generated
> >> enum:
> >>
> >> @FunctionHint(
> >>    input = Array(
> >>      new DataTypeHint(value = "RAW", bridgedTo = classOf[OneEnum])
> >>    ),
> >>    output = new DataTypeHint(value = "RAW", bridgedTo =
> >> classOf[OtherEnum])
> >> )
> >> class EnumMappingFunction extends ScalarFunction {
> >>
> >>    def eval(event: OneEnum): OtherEnum = {OtherEnum.DEFAULT_VALUE}
> >> }
> >>
> >> This results in the following error:
> >>
> >>
> >>
> >> *Invalid argument type at position 0. Data type RAW('org.test.OneEnum',
> >> '...') expected but RAW('org.test.OneEnum', '...') passed.*
> >>
> >>
> >> pon., 9 sie 2021 o 15:13 Dominik Wosiński <wos...@gmail.com>
> napisał(a):
> >>
> >>> Hey all,
> >>>
> >>> I think I've hit some weird issue in Flink TypeInformation generation.
> I
> >>> have the following code:
> >>>
> >>> val stream: DataStream[Event] = ...
> >>> tableEnv.createTemporaryView("TableName",stream)
> >>> val table = tableEnv
> >>> .sqlQuery("SELECT id, timestamp, eventType from TableName")
> >>> tableEnvironment.toAppendStream[NewEvent](table)
> >>>
> >>> In this particual example *Event* is an avro generated class and
> >>> *NewEvent
> >>> *is just POJO. This is just a toy example so please ignore the fact
> that
> >>> this operation doesn't make much sense.
> >>>
> >>> When I try to run the code I am getting the following error:
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> *org.apache.flink.table.api.ValidationException: Column types of query
> >>> result and sink for unregistered table do not match.Cause: Incompatible
> >>> types for sink column 'licence' at position 0.Query schema: [id:
> >>> RAW('org.apache.avro.util.Utf8', '...'), timestamp: BIGINT NOT NULL,
> >>> kind:
> >>> RAW('org.test.EventType', '...')]*
> >>>
> >>> *Sink schema:  id: RAW('org.apache.avro.util.Utf8', '?'), timestamp:
> >>> BIGINT, kind: RAW('org.test.EventType', '?')]*
> >>>
> >>> So, it seems that the type is recognized correctly but for some reason
> >>> there is still mismatch according to Flink, maybe because of
> >>> different type
> >>> serializer used ?
> >>>
> >>> Thanks in advance for any help,
> >>> Best Regards,
> >>> Dom.
> >>>
> >>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to