[ https://issues.apache.org/jira/browse/FLINK-37830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Bay updated FLINK-37830: ------------------------------- Description: When using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with flink ver. 1.20.1, restore checkpoint state in the connector fails the NullPointerException for ArrayList: {noformat} 2025-05-12 08:14:09 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: _children (com.fasterxml.jackson.databind.node.ArrayNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) document (co.elastic.clients.elasticsearch.core.bulk.IndexOperation) bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39) at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:163) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.NullPointerException at java.base/java.util.ArrayList.ensureCapacity(Unknown Source) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:96) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 56 more {noformat} The fix is to use kryo StdInstantiatorStrategy() as fallback instantiator strategy instead of main - like in other places in flink code. The fix is included in [Pull Request 126|https://github.com/apache/flink-connector-elasticsearch/pull/126] The committed fixed the problem for me. was: When using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with flink ver. 1.20.1, restore checkpoint state in the connector fails the NullPointerException for ArrayList: {noformat} 2025-05-12 08:14:09 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: _children (com.fasterxml.jackson.databind.node.ArrayNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) _children (com.fasterxml.jackson.databind.node.ObjectNode) document (co.elastic.clients.elasticsearch.core.bulk.IndexOperation) bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39) at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:163) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.NullPointerException at java.base/java.util.ArrayList.ensureCapacity(Unknown Source) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:96) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 56 more {noformat} {{}} The fix is to use kryo StdInstantiatorStrategy() as fallback instantiator strategy instead of main - like in other places in flink code. The fix is included in [Pull Request 126|https://github.com/apache/flink-connector-elasticsearch/pull/126] The committed fixed the problem for me. > Restore checkpoint state in the ES connector fails with NPE on ArrayList when > using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with flink ver. 1.20.1 > --------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-37830 > URL: https://issues.apache.org/jira/browse/FLINK-37830 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch > Environment: flink-connector-elasticsearch8 ver. 3.1.0-1.20 > flink ver. 1.20.1, > Reporter: Thomas Bay > Priority: Major > Labels: pull-request-available > > When using flink-connector-elasticsearch8 ver. 3.1.0-1.20 with flink ver. > 1.20.1, restore checkpoint state in the connector fails the > NullPointerException for ArrayList: > {noformat} > 2025-05-12 08:14:09 com.esotericsoftware.kryo.KryoException: > java.lang.NullPointerException Serialization trace: _children > (com.fasterxml.jackson.databind.node.ArrayNode) _children > (com.fasterxml.jackson.databind.node.ObjectNode) _children > (com.fasterxml.jackson.databind.node.ObjectNode) _children > (com.fasterxml.jackson.databind.node.ObjectNode) _children > (com.fasterxml.jackson.databind.node.ObjectNode) document > (co.elastic.clients.elasticsearch.core.bulk.IndexOperation) > bulkOperationVariant > (org.apache.flink.connector.elasticsearch.sink.Operation) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at > org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51) > at > org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39) > at > org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39) > at > org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) > at > org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) > at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at > org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) > at > org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:163) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at > java.base/java.lang.Thread.run(Unknown Source) Caused by: > java.lang.NullPointerException at > java.base/java.util.ArrayList.ensureCapacity(Unknown Source) at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:96) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ... 56 more > {noformat} > The fix is to use kryo StdInstantiatorStrategy() as fallback instantiator > strategy instead of main - like in other places in flink code. > The fix is included in [Pull Request > 126|https://github.com/apache/flink-connector-elasticsearch/pull/126] > The committed fixed the problem for me. -- This message was sent by Atlassian Jira (v8.20.10#820010)