Andrey Starostin created FLINK-38054: ----------------------------------------
Summary: Kryo Deserialization Error in ElasticSearch 8 Sink 3.1.0 With Flink 1.20.0 Key: FLINK-38054 URL: https://issues.apache.org/jira/browse/FLINK-38054 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.20.0, elasticsearch-3.1.0 Environment: The issue is observed in AWS Managed Flink 1.20.0 and when running the Job on a local Flink 1.20.0 cluster. The issue is not observed when running the Job with a MiniCluster. Reporter: Andrey Starostin When running a Flink Job which uses ElasticSearch 8 Sink submitted as a fat JAR on a Flink cluster, if the Job takes a checkpoint when there are ElasticSearch operations being buffered, and then fails, it cannot recover from this checkpoint. The following error prevents recovery: ``` com.esotericsoftware.kryo.KryoException: Unable to find class: co.elastic.clients.elasticsearch.core.bulk.UpdateOperation Serialization trace: bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39) at org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81) at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39) at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: co.elastic.clients.elasticsearch.core.bulk.UpdateOperation at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 29 more ``` The failing `UpdateOperation` class exists in the Jar. I have attempted to dig deeper, and it seems like this class is not visible from the context of ElasticSearch connector's OperationSerializer class. I have produced a minimal reproduction example in the github repository: [https://github.com/mayorandrew/flink-elasticsearch-deserialization-bug]. -- This message was sent by Atlassian Jira (v8.20.10#820010)