Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6026#discussion_r189437087 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -81,7 +85,12 @@ public String explainSource() { @Override protected AvroRowDeserializationSchema getDeserializationSchema() { - return new AvroRowDeserializationSchema(avroRecordClass); + return new AvroRowDeserializationSchema(avroRecordClass, tableSchemaToReturnType(schema)); + } + + /** Converts the table schema into into the return type. */ + private static RowTypeInfo tableSchemaToReturnType(TableSchema tableSchema) { --- End diff -- Extra function has been removed.
---