FYI. I've managed to fix that by switching to using `toDataStream`. It seems to be working fine now. I have created the issue about the UDF though, since it seems to be different issue. Not sure if an issue should be created for `toAppendStream` if this is meant to be deprecated.
pon., 9 sie 2021 o 19:33 Timo Walther <twal...@apache.org> napisał(a): > Sorry, I meant "will be deprecated in Flink 1.14" > > On 09.08.21 19:32, Timo Walther wrote: > > Hi Dominik, > > > > `toAppendStream` is soft deprecated in Flink 1.13 and will be deprecated > > in Flink 1.13. It uses the old type system and might not match perfectly > > with the other reworked type system in new functions and sources. > > > > For SQL, a lot of Avro classes need to be treated as RAW types. But we > > might address this issue soon and further improve Avro support. > > > > I would suggest to continue this discussion in a JIRA issue. Can you > > also share the code for `NewEvent` and te Avro schema or generated Avro > > class for `Event` for to have a fully reproducible example? > > > > What can help is to explicitly define the types: > > > > E.g. you can also use `DataTypes.of(TypeInformation)` both in > > `ScalarFunction.getTypeInference` and > > `StreamTableEnvironment.toDataStream()`. > > > > I hope this helps. > > > > Timo > > > > On 09.08.21 16:27, Dominik Wosiński wrote: > >> It should be `id` instead of `licence` in the error, I've copy-pasted it > >> incorrectly :< > >> > >> I've also tried additional thing, i.e. creating the ScalarFunction that > >> does mapping of one avro generated enum to additional avro generated > >> enum: > >> > >> @FunctionHint( > >> input = Array( > >> new DataTypeHint(value = "RAW", bridgedTo = classOf[OneEnum]) > >> ), > >> output = new DataTypeHint(value = "RAW", bridgedTo = > >> classOf[OtherEnum]) > >> ) > >> class EnumMappingFunction extends ScalarFunction { > >> > >> def eval(event: OneEnum): OtherEnum = {OtherEnum.DEFAULT_VALUE} > >> } > >> > >> This results in the following error: > >> > >> > >> > >> *Invalid argument type at position 0. Data type RAW('org.test.OneEnum', > >> '...') expected but RAW('org.test.OneEnum', '...') passed.* > >> > >> > >> pon., 9 sie 2021 o 15:13 Dominik Wosiński <wos...@gmail.com> > napisał(a): > >> > >>> Hey all, > >>> > >>> I think I've hit some weird issue in Flink TypeInformation generation. > I > >>> have the following code: > >>> > >>> val stream: DataStream[Event] = ... > >>> tableEnv.createTemporaryView("TableName",stream) > >>> val table = tableEnv > >>> .sqlQuery("SELECT id, timestamp, eventType from TableName") > >>> tableEnvironment.toAppendStream[NewEvent](table) > >>> > >>> In this particual example *Event* is an avro generated class and > >>> *NewEvent > >>> *is just POJO. This is just a toy example so please ignore the fact > that > >>> this operation doesn't make much sense. > >>> > >>> When I try to run the code I am getting the following error: > >>> > >>> > >>> > >>> > >>> > >>> *org.apache.flink.table.api.ValidationException: Column types of query > >>> result and sink for unregistered table do not match.Cause: Incompatible > >>> types for sink column 'licence' at position 0.Query schema: [id: > >>> RAW('org.apache.avro.util.Utf8', '...'), timestamp: BIGINT NOT NULL, > >>> kind: > >>> RAW('org.test.EventType', '...')]* > >>> > >>> *Sink schema: id: RAW('org.apache.avro.util.Utf8', '?'), timestamp: > >>> BIGINT, kind: RAW('org.test.EventType', '?')]* > >>> > >>> So, it seems that the type is recognized correctly but for some reason > >>> there is still mismatch according to Flink, maybe because of > >>> different type > >>> serializer used ? > >>> > >>> Thanks in advance for any help, > >>> Best Regards, > >>> Dom. > >>> > >>> > >>> > >>> > >> > > > >