Hi, julia.

I have read the code about this part. The problem as you said is that the
RowType passed to the avro-confluent format is nullable, which will cause
union with null in the schema.
I think FLINK-30438 is the same problem as yours. But I find the RowType
passed to avro-confluent format in Kafka connector is not nullable (from
`getCatalogTable().getResolvedSchema().toPhysicalRowDataType()`).

For your case, you have to modify the code in FileSystemTableSink.java#L299
<https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L299>
to
provide a non-nullable RowType. Or you could raise a fix for the filesystem
connector.

Best,
Hang


julia bogdan <juliabogda...@gmail.com> 于2023年11月15日周三 01:10写道:

> Hi!
>
> I'm facing an issue with the output schema for FileSystemTableSink.
> In FileSystemTableSink#createWriter (FileSystemTableSink.java#L29
> <https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L29>9)
> the original nullability of the underlying logical data type is not
> preserved, which introduces unnecessarily union with null in the schema,
> i.e. for avro, it generates [null, {"type":"record", "fields": ...}]
>  instead of {"type":"record", "fields": ...}.
> https://issues.apache.org/jira/browse/FLINK-30438 describes the same
> problem, but not sure if the root cause is the same.
> We use Flink 1.16.0, but it's relevant for newer versions.
>
>
> Looking at the source code, the issue exists because DataType::ROW
> instantiates RowType with isNullable = true by default (constructor here
> <https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java#L161-L163>).
> Similar DataType creation is followed by nullability check and calling
> .notNull() in DataTypeUtils
> <https://github.com/apache/flink/blob/1.16/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java#L124-L127>
> .
>
> I wonder whether someone had the same issue and whether there is a
> workaround.
>
>
> Thank you,
> Yuliya
>

Reply via email to