Andrey Starostin created FLINK-38054:
----------------------------------------

             Summary: Kryo Deserialization Error in ElasticSearch 8 Sink 3.1.0 
With Flink 1.20.0
                 Key: FLINK-38054
                 URL: https://issues.apache.org/jira/browse/FLINK-38054
             Project: Flink
          Issue Type: Bug
          Components: Connectors / ElasticSearch
    Affects Versions: 1.20.0, elasticsearch-3.1.0
         Environment: The issue is observed in AWS Managed Flink 1.20.0 and 
when running the Job on a local Flink 1.20.0 cluster. 

The issue is not observed when running the Job with a MiniCluster.
            Reporter: Andrey Starostin


When running a Flink Job which uses ElasticSearch 8 Sink submitted as a fat JAR 
on a Flink cluster, if the Job takes a checkpoint when there are ElasticSearch 
operations being buffered, and then fails, it cannot recover from this 
checkpoint.
 
The following error prevents recovery:
 
```
com.esotericsoftware.kryo.KryoException: Unable to find class: 
co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
Serialization trace:
bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51)
at 
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
at 
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
at 
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: 
co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 29 more
```
 
The failing `UpdateOperation` class exists in the Jar.
 
I have attempted to dig deeper, and it seems like this class is not visible 
from the context of ElasticSearch connector's OperationSerializer class.
 
I have produced a minimal reproduction example in the github repository: 
[https://github.com/mayorandrew/flink-elasticsearch-deserialization-bug].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to