featzhang commented on PR #124:
URL:
https://github.com/apache/flink-connector-pulsar/pull/124#issuecomment-4527998590
The change itself is correct and matches the default `deserialize(byte[],
Collector)` implementation in `DeserializationSchema` (flink-core), which drops
`null` returns by contract. Two things worth folding into this PR or a
follow-up:
1. The same pattern exists in three other wrappers in the same package and
is not addressed here:
- `PulsarSchemaWrapper.java:68`
- `PulsarTypeInformationWrapper.java:61`
- `GenericRecordDeserializationSchema.java:41`
They share the exact issue (`out.collect(instance)` without a null
check). Since the JIRA describes a contract bug, fixing only one wrapper leaves
the same footgun for the other code paths.
2. Behavior change for the table connector is worth calling out in the PR
description. In non-upsert mode, a null value previously reached
`PulsarRowDataConverter#emitRow` and threw `DeserializationException("Invalid
null value received in non-upsert mode...")`. After this PR, the wrapper drops
it silently. The new behavior is fine and aligns with Kafka, but it should not
be hidden in the changelog — users currently relying on the exception to detect
bad data will lose records silently. A `LOG.debug` on the dropped record would
help operability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]