[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15304062#comment-15304062 ]
B Wyatt commented on FLINK-3974: -------------------------------- In the topologies I've been building object reuse has been a pretty significant CPU win. It would be a shame to lose that capability for topos where care is taken to actually maintain the application level requirements of object reuse: Don't modify your inputs. Disabling chaining is a fine option if this is not a bottleneck in your topology. If unchained-splits become the default, I'd like to see the ability to chain them remain as an option. Sometimes splitting into multiple chain/threads is good: cpu heavy operators that benefit from parallelism. Sometimes maintaining the chain to avoid the de/serialization costs is good (cpu light operators with high throughput). The patch attached essentially gives each collector its own dedicated stream record with a reference to the value. You are correct it doesn't solve the problems of mutable operator inputs but, that has always been the primary concession of object reuse and it doesn't seem like a bad requirement to put on the application level. I don't know that there is a way to avoid the cost of cloning *and* protect against operators mutating input in a language like Java. > enableObjectReuse fails when an operator chains to multiple downstream > operators > -------------------------------------------------------------------------------- > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.0.3 > Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream<A> input = ... > input > .map(MapFunction<A,B>...) > .addSink(...); > input > .map(MapFunction<A,C>...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output<StreamRecord<A>>.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the > {{StreamRecord<A>}} to the second map operation it is actually a > {{StreamRecord<B>}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)