[ https://issues.apache.org/jira/browse/FLINK-20801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256230#comment-17256230 ]
lafeier commented on FLINK-20801: --------------------------------- Well, I tried Async IO and it didn't meet my needs.This issue is a result of kryO serialization concurrency. Will this issue be resolved in the future? > Using asynchronous methods in operators causes serialization problems > --------------------------------------------------------------------- > > Key: FLINK-20801 > URL: https://issues.apache.org/jira/browse/FLINK-20801 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.11.2 > Environment: * flink 1.11.2 > * java8 > * windows > Reporter: lafeier > Priority: Major > > Using asynchronous methods in operators causes serialization problems. > Exceptions are indeterminate, for example: > {code:java} > java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: > Corrupt stream, found tag: 21 at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:748) > {code} > > {code:java} > java.lang.RuntimeException: Cannot instantiate > class.java.lang.RuntimeException: Cannot instantiate class. at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: e11 name12 > {code} > {code:java} > org.apache.flink.types.NullKeyFieldException: Unable to access field > java.lang.String cn.ubattery.Person.name on object > nullorg.apache.flink.types.NullKeyFieldException: Unable to access field > java.lang.String cn.ubattery.Person.name on object null at > org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181) > at > org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329) > at > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185) > at > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at > java.lang.Thread.run(Thread.java:748) > {code} > It is only reproduced in the following order: > * flatMap (Multithreaded asynchronous operations are used in this method) > * keyBy ( Only the subsequent use of the Keyby operator will cause this > problem) > * flatMap > {color:#00875a}*code*{color} > {code:java} > @Test > public void testConcurrentKryoException() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > List<Person> list = new ArrayList<>(); > for (int i = 0; i < 100; i++) { > Person p = new Person("name" + i, i); > list.add(p); > } > DataStreamSource<Person> ds = env.fromCollection(list); > ds.flatMap(new FlatMapFunction<Person, Person>() { > @Override > public void flatMap(Person value, Collector<Person> out) throws > Exception { > CompletableFuture.supplyAsync(()->{ > return value; > }).whenComplete((data,ex)->{ > out.collect(data); > }); > } > }).keyBy("name").flatMap(new FlatMapFunction<Person, Person>() { > @Override > public void flatMap(Person value, Collector<Person> out) throws > Exception { > } > }); > env.execute("test"); > }{code} > {code:java} > @Data > @AllArgsConstructor > @NoArgsConstructor > public class Person { > String name; > int age; > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)