Thomas Bay created FLINK-37830:
----------------------------------

             Summary: Restore checkpoint state in the ES connector fails with 
NPE on ArrayList when using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with 
flink ver. 1.20.1
                 Key: FLINK-37830
                 URL: https://issues.apache.org/jira/browse/FLINK-37830
             Project: Flink
          Issue Type: Bug
          Components: Connectors / ElasticSearch
         Environment: flink-connector-elasticsearch8 ver. 3.1.0-1.20

flink ver. 1.20.1,
            Reporter: Thomas Bay


When using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with flink ver. 
1.20.1, restore checkpoint state in the connector fails the 
NullPointerException for ArrayList:
{noformat}
2025-05-12 08:14:09 com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException Serialization trace: _children 
(com.fasterxml.jackson.databind.node.ArrayNode) _children 
(com.fasterxml.jackson.databind.node.ObjectNode) _children 
(com.fasterxml.jackson.databind.node.ObjectNode) _children 
(com.fasterxml.jackson.databind.node.ObjectNode) _children 
(com.fasterxml.jackson.databind.node.ObjectNode) document 
(co.elastic.clients.elasticsearch.core.bulk.IndexOperation) 
bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation) 
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at 
org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51)
 at 
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
 at 
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
 at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
 at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
 at 
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
 at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
 at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at 
org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
 at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:163)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at 
java.base/java.lang.Thread.run(Unknown Source) Caused by: 
java.lang.NullPointerException at 
java.base/java.util.ArrayList.ensureCapacity(Unknown Source) at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:96)
 at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
... 56 more {noformat}
{{}}

The fix is to use kryo StdInstantiatorStrategy() as fallback instantiator 
strategy instead of main - like in other places in flink code.

The fix is included in [Pull Request 
126|https://github.com/apache/flink-connector-elasticsearch/pull/126] 

The committed fixed the problem for me.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to