[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302127#comment-15302127 ]
Stephan Ewen commented on FLINK-3974: ------------------------------------- Yes, that is a pretty clear bug. I guess the best workaround for now is to disable the object reuse mode. Object reuse does not really work well in the DataStream API streaming right now, it works pretty well in the DataSet API. Another quick workaround is to not chain the two different map functions {{.disableChaining()}}. The solution should be quite straightforward, though: - Not chain and "splitting" flows any more. I would actually like that solution. For splitting flows, it seems like a good heuristic to start a new chain/thread by default. - Each collector should use its own dedicated stream record. That would circumvent the ClassCast at least, but still be dangerous if the mappers actually alter the events. > enableObjectReuse fails when an operator chains to multiple downstream > operators > -------------------------------------------------------------------------------- > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.0.3 > Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream<A> input = ... > input > .map(MapFunction<A,B>...) > .addSink(...); > input > .map(MapFunction<A,C>...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output<StreamRecord<A>>.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the > {{StreamRecord<A>}} to the second map operation it is actually a > {{StreamRecord<B>}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)