rodriguezc opened a new pull request, #28488:
URL: https://github.com/apache/flink/pull/28488
## References
This implementation was inspired by PR #28465 (FLINK-29984), which addressed
schema compatibility between writer and reader schemas. The current fix adapts
the approach specifically for enum/string type mismatches while preserving
field projection capabilities.
## What is the purpose of the change
This pull request fixes a deserialization failure when consuming Avro
records with enum fields from Confluent Schema Registry. When the writer schema
(from Schema Registry) declares a field as an `ENUM` type but the reader schema
(from Flink DDL) declares the same field as `STRING`, the `GenericDatumReader`
fails during deserialization because it expects to find an enum in the data but
the reader schema instructs it to produce a string output.
The fix introduces a schema merging strategy in
`RegistryAvroDeserializationSchema.deserialize()` that creates a hybrid
"expected schema":
- **From the reader schema**: Preserves the field list and field order
(enabling field projection - reading only a subset of fields)
- **From the writer schema**: Substitutes `ENUM` types for fields where the
reader declared `STRING` but the writer has `ENUM`
This allows `GenericDatumReader` to correctly deserialize enum values as
`GenericEnumSymbol` objects, which are then converted to `StringData` by
`AvroToRowDataConverters` via the `.toString()` method.
## Brief change log
- Added `mergeSchemaTypes()` method in `RegistryAvroDeserializationSchema`
to create hybrid reader/writer schemas
- Schema merge specifically handles enum/string type mismatches by
preferring the writer's enum type
- Schema merge preserves reader schema field structure to maintain field
projection support
- Modified `AvroToRowDataConverters` to use name-based field access for
`GenericRecord` (instead of positional)
- Added comprehensive test coverage for enum deserialization with various
scenarios
## Verifying this change
This change added tests and can be verified as follows:
- Added
`RegistryAvroDeserializationSchemaTest.testNestedRecordWithEnumField()` that
validates:
- Enum to string conversion works correctly
- Nullable enum fields are handled properly
- Field projection (subset of fields) works with enum types
- Nested records with enum fields are deserialized correctly
- **Note**: The test schema structure is based on real-world CDC from Qlik
Replicate, which commonly produces nested records with enum fields (e.g., CDC
metadata like operation enum types)
- Existing tests continue to pass, ensuring no regression in field
projection functionality
-
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: yes (Avro deserialization logic)
- The runtime per-record code paths (performance sensitive): yes (adds
schema merging on each deserialization)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable (bug fix)
##### Was generative AI tooling used to co-author this PR?
- [X] Yes (please specify the tool below)
Generated-by: Claude Code (Claude Sonnet 4.5)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]