Andriy Redko created FLINK-30718:
------------------------------------
Summary: Cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter
Key: FLINK-30718
URL: https://issues.apache.org/jira/browse/FLINK-30718
Project: Flink
Issue Type: Bug
Components: Connectors / Opensearch
Affects Versions: 1.16.0
Reporter: Andriy Redko
When using OpenSearchSink programmatically
{noformat}
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", 8081);
final Collection<Tuple4<String, String, Long, Long>> users = new
ArrayList<>();
users.add(Tuple4.of("u1", "admin", 100L, 200L));
final DataStream<Tuple4<String, String, Long, Long>> source =
env.fromCollection(users);
final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
.setHosts(new HttpHost("localhost", 9200, "https"))
.setEmitter( (element, ctx, indexer) -> {
indexer.add(
Requests
.indexRequest()
.index("users")
.id(element.f0)
.source(Map.ofEntries(
Map.entry("user_id", element.f0),
Map.entry("user_name", element.f1),
Map.entry("uv", element.f2),
Map.entry("pv", element.f3)
)));
})
.setConnectionUsername("admin")
.setConnectionPassword("admin")
.setAllowInsecure(true)
.setBulkFlushMaxActions(1)
.build();
source.sinkTo(sink);
env.execute("Opensearch end to end sink test example");
{noformat}
the stream processing fails with the exception{color:#000000}
{color}
{noformat}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type
org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of
org.apache.flink.connector.opensearch.sink.OpensearchSink
at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
... 9 more
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)