[ 
https://issues.apache.org/jira/browse/FLINK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219588#comment-17219588
 ] 

xiaozilong commented on FLINK-19779:
------------------------------------

Hi [~danny0405] , I think there is still existed a bug in PR. The code what 
cause this problem like: 
{code:java}
RowType rowType = (RowType) TableSchema.builder()
   .field("row1", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.STRING())))
   .field("row2", DataTypes.ROW(DataTypes.FIELD("b", DataTypes.STRING())))
   .field("row3", DataTypes.ROW(
      DataTypes.FIELD("row2", DataTypes.ROW(DataTypes.FIELD("c", 
DataTypes.STRING())))))
   .build().toRowDataType().getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
System.out.println(schema.toString(true));{code}
it will throw:
{code:java}
org.apache.avro.SchemaParseException: Can't redefine: row2{code}
I think we can use a special string as the top name. Thanks

> Remove the "record_" field name prefix for Confluent Avro format 
> deserialization
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-19779
>                 URL: https://issues.apache.org/jira/browse/FLINK-19779
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.12.0
>            Reporter: Danny Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>
> Reported by Maciej Bryński :
> Problem is this is not compatible. I'm unable to read anything from Kafka 
> using Confluent Registry. Example:
> I have data in Kafka with following value schema:
> {code:java}
> {
>   "type": "record",
>   "name": "myrecord",
>   "fields": [
>     {
>       "name": "f1",
>       "type": "string"
>     }
>   ]
> }
> {code}
> I'm creating table using this avro-confluent format:
> {code:sql}
> create table `test` (
>       `f1` STRING
> ) WITH (
>   'connector' = 'kafka', 
>   'topic' = 'test', 
>   'properties.bootstrap.servers' = 'localhost:9092', 
>   'properties.group.id' = 'test1234', 
>    'scan.startup.mode' = 'earliest-offset', 
>   'format' = 'avro-confluent'
>   'avro-confluent.schema-registry.url' = 'http://localhost:8081'
> );
> {code}
> When trying to select data I'm getting error:
> {code:noformat}
> SELECT * FROM test;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.avro.AvroTypeException: Found myrecord, expecting record, missing 
> required field record_f1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to