Hi, julia. I have read the code about this part. The problem as you said is that the RowType passed to the avro-confluent format is nullable, which will cause union with null in the schema. I think FLINK-30438 is the same problem as yours. But I find the RowType passed to avro-confluent format in Kafka connector is not nullable (from `getCatalogTable().getResolvedSchema().toPhysicalRowDataType()`).
For your case, you have to modify the code in FileSystemTableSink.java#L299 <https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L299> to provide a non-nullable RowType. Or you could raise a fix for the filesystem connector. Best, Hang julia bogdan <juliabogda...@gmail.com> 于2023年11月15日周三 01:10写道: > Hi! > > I'm facing an issue with the output schema for FileSystemTableSink. > In FileSystemTableSink#createWriter (FileSystemTableSink.java#L29 > <https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L29>9) > the original nullability of the underlying logical data type is not > preserved, which introduces unnecessarily union with null in the schema, > i.e. for avro, it generates [null, {"type":"record", "fields": ...}] > instead of {"type":"record", "fields": ...}. > https://issues.apache.org/jira/browse/FLINK-30438 describes the same > problem, but not sure if the root cause is the same. > We use Flink 1.16.0, but it's relevant for newer versions. > > > Looking at the source code, the issue exists because DataType::ROW > instantiates RowType with isNullable = true by default (constructor here > <https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java#L161-L163>). > Similar DataType creation is followed by nullability check and calling > .notNull() in DataTypeUtils > <https://github.com/apache/flink/blob/1.16/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java#L124-L127> > . > > I wonder whether someone had the same issue and whether there is a > workaround. > > > Thank you, > Yuliya >