[ https://issues.apache.org/jira/browse/FLINK-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350900#comment-15350900 ]
ASF GitHub Bot commented on FLINK-4113: --------------------------------------- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2156#discussion_r68566231 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java --- @@ -89,7 +89,7 @@ public void collect(IT record) { numRecordsIn.inc(); try { if (base == null) { - base = objectReuseEnabled ? record : serializer.copy(record); + base = serializer.copy(record); } else { base = objectReuseEnabled ? reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record)); --- End diff -- We fixed this in FLINK-3340 for non-chained reduce drivers (where the driver chooses the object to deserialize into) but for chained drivers we cannot prevent one UDF from overwriting an object from a previous UDF. If you look in {{OverwriteObjects.java}} you will see {{testReduce}} fail. > Always copy first value in ChainedAllReduceDriver > ------------------------------------------------- > > Key: FLINK-4113 > URL: https://issues.apache.org/jira/browse/FLINK-4113 > Project: Flink > Issue Type: Bug > Components: Local Runtime > Affects Versions: 1.1.0, 1.0.4 > Reporter: Greg Hogan > Assignee: Greg Hogan > Priority: Critical > Fix For: 1.1.0, 1.0.4 > > > {{ChainedAllReduceDriver.collect}} must copy the first record even when > object reuse is enabled or {{base}} may later point to the same object as > {{record}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)