[ 
https://issues.apache.org/jira/browse/FLINK-38054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38054:
-----------------------------------
    Labels: pull-request-available  (was: )

> Kryo deserialization error in ElasticSearch 8 sink 3.1.0 with Flink 1.20.0
> --------------------------------------------------------------------------
>
>                 Key: FLINK-38054
>                 URL: https://issues.apache.org/jira/browse/FLINK-38054
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: elasticsearch-3.1.0, 1.20.0
>         Environment: The issue is observed in AWS Managed Flink 1.20.0 and 
> when running the Job on a local Flink 1.20.0 cluster. 
> The issue is not observed when running the Job with a MiniCluster.
>            Reporter: Andrey Starostin
>            Priority: Major
>              Labels: pull-request-available
>
> When running a Flink Job which uses ElasticSearch 8 Sink submitted as a fat 
> JAR on a Flink cluster, if the Job takes a checkpoint when there are 
> ElasticSearch operations being buffered, and then fails, it cannot recover 
> from this checkpoint.
>  
> The following error prevents recovery:
>  
> {code:java}
> com.esotericsoftware.kryo.KryoException: Unable to find class: 
> co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
> Serialization trace:
> bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at 
> org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51)
> at 
> org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
> at 
> org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
> at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
> at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
> at 
> org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
> at 
> org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> at 
> org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
> at 
> org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
> at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.ClassNotFoundException: 
> co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
> at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
> at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> at java.base/java.lang.Class.forName0(Native Method)
> at java.base/java.lang.Class.forName(Class.java:398)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> ... 29 more{code}
>  
> The failing {{UpdateOperation}} class exists in the Jar.
>  
> I have attempted to dig deeper, and it seems like this class is not visible 
> from the context of ElasticSearch connector's {{OperationSerializer}} class.
>  
> I have produced a minimal reproduction example in the github repository: 
> [https://github.com/mayorandrew/flink-elasticsearch-deserialization-bug].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to