Myracle opened a new pull request, #27812:
URL: https://github.com/apache/flink/pull/27812

   ## What is the purpose of the change
   
   `FlinkScalaKryoInstantiator.newKryo()` sets the Kryo `InstantiatorStrategy` 
to a pure `StdInstantiatorStrategy` (Objenesis), which creates all object 
instances **by bypassing constructors entirely**. This is inconsistent with the 
fallback path in `KryoSerializer.getKryoInstance()`, which uses 
`DefaultInstantiatorStrategy` (tries no-arg constructors first) with 
`StdInstantiatorStrategy` as a fallback.
   
   When `flink-table-api-scala` is on the classpath, `KryoSerializer` loads 
`FlinkScalaKryoInstantiator` via reflection and uses the Kryo instance it 
creates. Any class that relies on its no-arg constructor to initialize internal 
state will then fail during deserialization, because the constructor is never 
invoked. A concrete example is Apache Iceberg's `SerializableByteBufferMap`, 
which initializes its internal `wrapped` map (`Map<Integer, ByteBuffer>`) in 
the no-arg constructor. When deserialized via Kryo's `MapSerializer`, 
`MapSerializer.create()` calls `kryo.newInstance()`, which bypasses the 
constructor under `StdInstantiatorStrategy`, leaving `wrapped = null`. 
Subsequently, `MapSerializer.read()` calls `map.put(key, value)`, which 
delegates to `wrapped.put()` — resulting in a `NullPointerException`.
   
   This pull request aligns `FlinkScalaKryoInstantiator`'s 
`InstantiatorStrategy` with the default behavior in 
`KryoSerializer.getKryoInstance()` — using `DefaultInstantiatorStrategy` as the 
primary strategy and `StdInstantiatorStrategy` as the fallback — so that no-arg 
constructors are properly invoked when available.
   
   ## Brief change log
   
   - Changed `FlinkScalaKryoInstantiator.newKryo()` to use 
`DefaultInstantiatorStrategy` with `StdInstantiatorStrategy` as a fallback, 
instead of a pure `StdInstantiatorStrategy`. This ensures Kryo first attempts 
to invoke no-arg constructors (via reflection) and only falls back to Objenesis 
(bypassing constructors) when no suitable constructor is found.
   - Added `FlinkScalaKryoInstantiatorTest` to verify:
     - The `InstantiatorStrategy` is correctly configured as 
`DefaultInstantiatorStrategy` with `StdInstantiatorStrategy` fallback.
     - Classes with no-arg constructors have their fields properly initialized 
during Kryo instantiation.
     - Serialization round-trip works correctly for objects with Map fields 
initialized in the constructor.
     - Classes without no-arg constructors still work via the Objenesis 
fallback.
     - `KryoSerializer` (which loads `FlinkScalaKryoInstantiator` via 
reflection) produces a Kryo instance with the correct strategy.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added 
`FlinkScalaKryoInstantiatorTest.testInstantiatorStrategyIsDefaultWithFallback()`
 that verifies the Kryo instance uses `DefaultInstantiatorStrategy` as the 
primary strategy with `StdInstantiatorStrategy` as the fallback.
   - Added 
`FlinkScalaKryoInstantiatorTest.testClassWithNoArgConstructorIsProperlyInitialized()`
 that verifies `kryo.newInstance()` invokes the no-arg constructor and properly 
initializes fields (simulating Iceberg's `SerializableByteBufferMap` scenario).
   - Added 
`FlinkScalaKryoInstantiatorTest.testSerializationRoundTripWithMapField()` that 
performs an end-to-end serialize/deserialize round-trip for an object with a 
Map field initialized in the constructor, reproducing the exact NPE failure 
scenario.
   - Added 
`FlinkScalaKryoInstantiatorTest.testClassWithoutNoArgConstructorUsesObjenesisFallback()`
 that verifies classes without no-arg constructors can still be instantiated 
via the Objenesis fallback strategy.
   - Added 
`FlinkScalaKryoInstantiatorTest.testKryoSerializerUsesCorrectStrategy()` that 
verifies `KryoSerializer` (which loads `FlinkScalaKryoInstantiator` via 
reflection when it's on the classpath) produces a Kryo instance with the 
correct `InstantiatorStrategy`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: yes (fixes the Kryo `InstantiatorStrategy` used by 
`FlinkScalaKryoInstantiator`, ensuring no-arg constructors are invoked during 
Kryo deserialization when `flink-table-api-scala` is on the classpath)
     - The runtime per-record code paths (performance sensitive): no (the 
`DefaultInstantiatorStrategy` adds a negligible reflection-based constructor 
lookup before falling back to Objenesis; for classes with registered custom 
serializers, `newInstance` is not called by Kryo at all)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to