Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6026#discussion_r188786194 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -81,7 +85,12 @@ public String explainSource() { @Override protected AvroRowDeserializationSchema getDeserializationSchema() { - return new AvroRowDeserializationSchema(avroRecordClass); + return new AvroRowDeserializationSchema(avroRecordClass, tableSchemaToReturnType(schema)); + } + + /** Converts the table schema into into the return type. */ + private static RowTypeInfo tableSchemaToReturnType(TableSchema tableSchema) { --- End diff -- Do you need this extra function, I think just inlining it should be fine.
---