[ https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486929#comment-16486929 ]
ASF GitHub Bot commented on FLINK-9384: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6026#discussion_r190170050 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -76,19 +77,35 @@ */ private SpecificRecord record; + /** Type information describing the result type. + * + */ + private final TypeInformation<Row> typeInfo; + /** * Creates a Avro deserialization schema for the given record. * * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row */ - public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz){ + this(recordClazz, null); + } + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + * @param typeInfo Type information describing the result type. + */ + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz, TypeInformation<Row> typeInfo) { --- End diff -- This constructor is not required. We should always use the result of the `AvroRecordClassConverter`. > KafkaAvroTableSource failed to work due to type mismatch > -------------------------------------------------------- > > Key: FLINK-9384 > URL: https://issues.apache.org/jira/browse/FLINK-9384 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API & SQL > Affects Versions: 1.6.0 > Reporter: Jun Zhang > Priority: Blocker > Labels: easyfix, patch > Fix For: 1.6.0 > > Attachments: flink-9384.patch > > > An exception was thrown when using KafkaAvroTableSource as follows: > Exception in thread "main" org.apache.flink.table.api.TableException: > TableSource of type > org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned > a DataStream of type GenericType<org.apache.flink.types.Row> that does not > match with the type Row(id: Integer, name: String, age: Integer, event: > GenericType<java.util.Map>) declared by the TableSource.getReturnType() > method. Please validate the implementation of the TableSource. > at > org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279) > at org.apache.flink.table.api.Table.writeToSink(table.scala:862) > at org.apache.flink.table.api.Table.writeToSink(table.scala:830) > at > org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85) > > It is caused by a discrepancy between the type returned by the TableSource > and the type returned by the DataStream. I've already fixed it, would someone > please review the patch and see if it could be merged. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)