Hi Dominik,
`toAppendStream` is soft deprecated in Flink 1.13 and will be deprecated
in Flink 1.13. It uses the old type system and might not match perfectly
with the other reworked type system in new functions and sources.
For SQL, a lot of Avro classes need to be treated as RAW types. But we
might address this issue soon and further improve Avro support.
I would suggest to continue this discussion in a JIRA issue. Can you
also share the code for `NewEvent` and te Avro schema or generated Avro
class for `Event` for to have a fully reproducible example?
What can help is to explicitly define the types:
E.g. you can also use `DataTypes.of(TypeInformation)` both in
`ScalarFunction.getTypeInference` and
`StreamTableEnvironment.toDataStream()`.
I hope this helps.
Timo
On 09.08.21 16:27, Dominik Wosiński wrote:
It should be `id` instead of `licence` in the error, I've copy-pasted it
incorrectly :<
I've also tried additional thing, i.e. creating the ScalarFunction that
does mapping of one avro generated enum to additional avro generated enum:
@FunctionHint(
input = Array(
new DataTypeHint(value = "RAW", bridgedTo = classOf[OneEnum])
),
output = new DataTypeHint(value = "RAW", bridgedTo = classOf[OtherEnum])
)
class EnumMappingFunction extends ScalarFunction {
def eval(event: OneEnum): OtherEnum = {OtherEnum.DEFAULT_VALUE}
}
This results in the following error:
*Invalid argument type at position 0. Data type RAW('org.test.OneEnum',
'...') expected but RAW('org.test.OneEnum', '...') passed.*
pon., 9 sie 2021 o 15:13 Dominik Wosiński <wos...@gmail.com> napisał(a):
Hey all,
I think I've hit some weird issue in Flink TypeInformation generation. I
have the following code:
val stream: DataStream[Event] = ...
tableEnv.createTemporaryView("TableName",stream)
val table = tableEnv
.sqlQuery("SELECT id, timestamp, eventType from TableName")
tableEnvironment.toAppendStream[NewEvent](table)
In this particual example *Event* is an avro generated class and *NewEvent
*is just POJO. This is just a toy example so please ignore the fact that
this operation doesn't make much sense.
When I try to run the code I am getting the following error:
*org.apache.flink.table.api.ValidationException: Column types of query
result and sink for unregistered table do not match.Cause: Incompatible
types for sink column 'licence' at position 0.Query schema: [id:
RAW('org.apache.avro.util.Utf8', '...'), timestamp: BIGINT NOT NULL, kind:
RAW('org.test.EventType', '...')]*
*Sink schema: id: RAW('org.apache.avro.util.Utf8', '?'), timestamp:
BIGINT, kind: RAW('org.test.EventType', '?')]*
So, it seems that the type is recognized correctly but for some reason
there is still mismatch according to Flink, maybe because of different type
serializer used ?
Thanks in advance for any help,
Best Regards,
Dom.