[ https://issues.apache.org/jira/browse/FLINK-20801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256431#comment-17256431 ]
Chesnay Schepler commented on FLINK-20801: ------------------------------------------ No, async operations are relegated to Async IO, and will not make it into the core functions. Why doesn't it suit your needs? Maybe we can extend it to fulfill your use-case. > Using asynchronous methods in operators causes serialization problems > --------------------------------------------------------------------- > > Key: FLINK-20801 > URL: https://issues.apache.org/jira/browse/FLINK-20801 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.11.2 > Environment: * flink 1.11.2 > * java8 > * windows > Reporter: lafeier > Priority: Major > > Using asynchronous methods in operators causes serialization problems. > Exceptions are indeterminate, for example: > {code:java} > java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: > Corrupt stream, found tag: 21 at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:748) > {code} > > {code:java} > java.lang.RuntimeException: Cannot instantiate > class.java.lang.RuntimeException: Cannot instantiate class. at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: e11 name12 > {code} > {code:java} > org.apache.flink.types.NullKeyFieldException: Unable to access field > java.lang.String cn.ubattery.Person.name on object > nullorg.apache.flink.types.NullKeyFieldException: Unable to access field > java.lang.String cn.ubattery.Person.name on object null at > org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181) > at > org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329) > at > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185) > at > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:748) > {code} > It is only reproduced in the following order: > * flatMap (Multithreaded asynchronous operations are used in this method) > * keyBy ( Only the subsequent use of the Keyby operator will cause this > problem) > * flatMap > {color:#00875a}*code*{color} > {code:java} > @Test > public void testConcurrentKryoException() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > List<Person> list = new ArrayList<>(); > for (int i = 0; i < 100; i++) { > Person p = new Person("name" + i, i); > list.add(p); > } > DataStreamSource<Person> ds = env.fromCollection(list); > ds.flatMap(new FlatMapFunction<Person, Person>() { > @Override > public void flatMap(Person value, Collector<Person> out) throws > Exception { > CompletableFuture.supplyAsync(()->{ > return value; > }).whenComplete((data,ex)->{ > out.collect(data); > }); > } > }).keyBy("name").flatMap(new FlatMapFunction<Person, Person>() { > @Override > public void flatMap(Person value, Collector<Person> out) throws > Exception { > } > }); > env.execute("test"); > }{code} > {code:java} > @Data > @AllArgsConstructor > @NoArgsConstructor > public class Person { > String name; > int age; > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)