Hi Peter,

don't get confused by the year 2017 in the ticket. We had better Avro support in the meantime but this was based on the old type system around TypeInformation. Now we need to build up this support again for the new type system. I just found this ticket and found that the title fits. But we are planning to have better support soon either 1.15 or 1.16 latest.

Regards,
Timo

On 20.10.21 18:43, Peter Schrott wrote:
Hi Timo,

sorry for being the party-pooper here! :O

My problem with the UDF is, that the SQL select will be passen from outside and the outside world does not know about the UDF.

For the UTF8, I know that feature, unfortunately the schema is already up and running and can't be touched that easily. But this would actually be covered by your UDF suggestion.

Thanks for the update about the the open ticket, its open since 2017 – seems not to be fixed in near future. :)

Best, Peter

On Wed, Oct 20, 2021 at 5:55 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Peter,

    as a temporary workaround I would simply implement a UDF like:

    public class EverythingToString extends ScalarFunction {

         public String eval(@DataTypeHint(inputGroup = ANY) Object o) {
           return o.toString();
         }
    }

    For the Utf8 issue, you can instruct Avro to generate Java classes with
    String instead using the `avro.java.string` option.

    The rework of the type system messed up the Avro support in Flink. This
    is a known issue that is tracked under

    https://issues.apache.org/jira/browse/FLINK-8183
    <https://issues.apache.org/jira/browse/FLINK-8183>

    Regards,
    Timo

    On 20.10.21 17:30, Peter Schrott wrote:
     > Hi Timo,
     >
     > thanks a lot for your suggestion.
     >
     > I also considered this workaround but when going from DataStreams
    API to
     > Table API (using the POJO generated by maven avro plugin) types
    are not
     > mapped correctly, esp. UTF8 (avros implementation of CharSquence)
    and
     > also enums. In the table I have then mostly RAW types, which are not
     > handy to perform SQL statements on. It is already discussed here:
     > https://www.mail-archive.com/user@flink.apache.org/msg44449.html
    <https://www.mail-archive.com/user@flink.apache.org/msg44449.html>
     > <https://www.mail-archive.com/user@flink.apache.org/msg44449.html
    <https://www.mail-archive.com/user@flink.apache.org/msg44449.html>>
     >
     > Best, Peter
     >
     > On Wed, Oct 20, 2021 at 5:21 PM Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>
     > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
     >
     >     A current workaround is to use DataStream API to read the
    data and
     >     provide your custom Avro schema to configure the format. Then
    switch to
     >     Table API.
     >
     >     StreamTableEnvironment.fromDataStream(...) accepts all data
    types. Enum
     >     classes will be represented as RAW types but you can forward
    them as
     >     blackboxes or convert them in a UDF.
     >
     >     We will further improve the support of external types in the
    Table API
     >     type system in the near future.
     >
     >     Regards,
     >     Timo
     >
     >     On 20.10.21 15:51, Peter Schrott wrote:
     >      > Hi people!
     >      >
     >      > I was digging deeper this days and found the "root cause"
    of the
     >     issue and the difference between avro reading from files and avro
     >     reading from Kafka & SR.
     >      >
     >      > plz see:
     >
    
https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
    
<https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E>
>  <https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E <https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E>>
     >      >
     >      > The main problem with Kafka & SR is, that the
     >     "org.apache.avro.generic.GenericDatumReader" is initialized
    with and
     >     "expected" schema which is taken from the flinks sql table
     >     definition. When it comes to deserializing the and attribute with
     >     type "enum" it does not match with the expected schema where this
     >     same attribute is typed as "string". Hence avro deserializer
    breaks
     >     here.
     >      >
     >      > Not sure how to tackle that issue. The functioning of the
     >     "GeneraticDatumReader" can not really be changed. A solution
    could
     >     be to create an analogues reader for reading data based on
    SQL ddl.
     >      >
     >      > Cheers, Peter
     >      >
     >      > On 2021/10/12 16:18:30 Dongwon Kim wrote:
     >      >> Hi community,
     >      >>
     >      >> Can I get advice on this question?
     >      >>
     >      >> Another user just sent me an email asking whether I found a
     >     solution or a
     >      >> workaround for this question, but I'm still stuck there.
     >      >>
     >      >> Any suggestions?
     >      >>
     >      >> Thanks in advance,
     >      >>
     >      >> Dongwon
     >      >>
     >      >> ---------- Forwarded message ---------
     >      >> From: Dongwon Kim <eastcirc...@gmail.com
    <mailto:eastcirc...@gmail.com>
     >     <mailto:eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>>
     >      >> Date: Mon, Aug 9, 2021 at 7:26 PM
     >      >> Subject: How to deserialize Avro enum type in Flink SQL?
     >      >> To: user <user@flink.apache.org
    <mailto:user@flink.apache.org> <mailto:user@flink.apache.org
    <mailto:user@flink.apache.org>>>
     >      >>
     >      >>
     >      >> Hi community,
     >      >>
     >      >> I have a Kafka topic where the schema of its values is
    defined
     >     by the
     >      >> "MyRecord" record in the following Avro IDL and registered to
     >     the Confluent
     >      >> Schema Registry.
     >      >>
     >      >>> @namespace("my.type.avro")
     >      >>> protocol MyProtocol {
     >      >>>    enum MyEnumType {
     >      >>>      TypeVal1, TypeVal2
     >      >>>    }
     >      >>>    record MyEntry {
     >      >>>      MyEnumType type;
     >      >>>    }
     >      >>>    record MyRecord {
     >      >>>      array<MyEntry> entries;
     >      >>>    }
     >      >>> }
     >      >>
     >      >>
     >      >> To read from the topic, I've defined the following DDL:
     >      >>
     >      >>> CREATE TABLE my_table
     >      >>
     >      >> (
     >      >>>      `entries` ARRAY<ROW<
     >      >>>          *`type` ??? (This is the main question)*
     >      >>>      >>
     >      >>> ) WITH (
     >      >>>      'connector' = 'kafka',
     >      >>>      'topic' = 'my-topic',
     >      >>>      'properties.bootstrap.servers' = '...:9092',
     >      >>>      'scan.startup.mode' = 'latest-offset',
     >      >>>      'value.format' = 'avro-confluent',
     >      >>>      'value.avro-confluent.schema-registry.url' =
    'http://...:8081'
     >      >>>
     >      >> )
     >      >>
     >      >>
     >      >> And I run the following query :
     >      >>
     >      >>> SELECT * FROM my_table
     >      >>
     >      >>
     >      >> Now I got the following messages in Flink-1.13.1 when I use
     >     *STRING* for
     >      >> the type:
     >      >>
     >      >>> *Caused by: java.io.IOException: Failed to deserialize Avro
     >     record.*
     >      >>>    at
     >      >>>
>  org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
     >      >>>    at
     >      >>>
>  org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
     >      >>>    at
     >      >>>
>  org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
     >      >>>    at
     >      >>>
>  org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
     >      >>> *Caused by: org.apache.avro.AvroTypeException: Found
     >      >>> my.type.avro.MyEnumType, expecting union*
     >      >>>    at
     >      >>> org.apache.avro.io <http://org.apache.avro.io>
     >     <http://org.apache.avro.io
    
<http://org.apache.avro.io>>.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
     >      >>>    at
    org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
     >      >>>    at
     >      >>> org.apache.avro.io <http://org.apache.avro.io>
     >     <http://org.apache.avro.io
    
<http://org.apache.avro.io>>.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
     >      >>>    at
     >      >>>
>  org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
     >      >>>    at
     >      >>>
>  org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
     >      >>>    at
     >      >>>
>  org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
     >      >>>    ... 9 more
     >      >>
     >      >> The reason I use the STRING type is just for
    fast-prototyping.
     >      >>
     >      >> While reading through [1], I've been thinking about using
     >     *RAW('class',
     >      >> 'snapshot')* where 'class' is my.type.avro.MyEnumType,
    but I'm
     >     not sure
     >      >> whether it is a good idea and if so, what can be a value
    for the
     >     snapshot.
     >      >>
     >      >> [1]
     >      >>
     >
    
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
    
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw>
>  <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw>>
     >      >>
     >      >> Thanks in advance,
     >      >>
     >      >> Dongwon
     >      >>
     >      >
     >


Reply via email to