Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5737#discussion_r176212439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -763,48 +764,32 @@ public Executor getFutureExecutor() { return userAccumulators; } - /** - * Gets the accumulator results. - */ - public Map<String, Object> getAccumulators() { - - Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); - - Map<String, Object> result = new HashMap<>(); - for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), entry.getValue().getLocalValue()); - } - - return result; - } - /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. */ @Override public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() { + return aggregateUserAccumulators() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> serializeAccumulator(entry.getKey(), entry.getValue()))); + } - Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); - - Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size()); - for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) { - + private static SerializedValue<Object> serializeAccumulator(String name, Accumulator<?, ?> accumulator) { + try { + if (accumulator instanceof FailedAccumulator) { + return new SerializedValue<>(accumulator); + } + return new SerializedValue<>(accumulator.getLocalValue()); + } catch (IOException ioe) { + LOG.error("Could not serialize accumulator " + name + '.', ioe); try { - final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); - result.put(entry.getKey(), serializedValue); - } catch (IOException ioe) { - LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe); - - try { - result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe))); - } catch (IOException e) { - throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e); - } + return new SerializedValue<>(new FailedAccumulator(ioe)); --- End diff -- Hmm, the problem I see here is that in the success case, we store the accumulator value and in the failure case we store an `Accumulator` instance. Thus, the user will expect the accumulator value and casting it accordingly. Thus he will never call the `Accumulator` methods which will throw the exceptions (see `JobExecutionResult` for how the user interacts with the accumulator values). In that sense the previous solution with storing a `FailedAccumulatorSerialization` was also flawed. What we actually would have to store in the `SerializedValue` is something like an `Either<Throwable, V>`. On the client side when accessing the `accumulatorsValueMap` it should check whether it is left or right and in the left case throw the exception. Alternatively, we say that an accumulator failure always results in a job failure. This means that in `JobMaster#jobStatusChanged` we generate a failed `ArchivedExecutionGraph` in case of an accumulator failure.
---