Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176212439
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -763,48 +764,32 @@ public Executor getFutureExecutor() {
                return userAccumulators;
        }
     
    -   /**
    -    * Gets the accumulator results.
    -    */
    -   public Map<String, Object> getAccumulators() {
    -
    -           Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
    -
    -           Map<String, Object> result = new HashMap<>();
    -           for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
    -                   result.put(entry.getKey(), 
entry.getValue().getLocalValue());
    -           }
    -
    -           return result;
    -   }
    -
        /**
         * Gets a serialized accumulator map.
         * @return The accumulator map with serialized accumulator values.
         */
        @Override
        public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
{
    +           return aggregateUserAccumulators()
    +                   .entrySet()
    +                   .stream()
    +                   .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
serializeAccumulator(entry.getKey(), entry.getValue())));
    +   }
     
    -           Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
    -
    -           Map<String, SerializedValue<Object>> result = new 
HashMap<>(accumulatorMap.size());
    -           for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
    -
    +   private static SerializedValue<Object> serializeAccumulator(String 
name, Accumulator<?, ?> accumulator) {
    +           try {
    +                   if (accumulator instanceof FailedAccumulator) {
    +                           return new SerializedValue<>(accumulator);
    +                   }
    +                   return new 
SerializedValue<>(accumulator.getLocalValue());
    +           } catch (IOException ioe) {
    +                   LOG.error("Could not serialize accumulator " + name + 
'.', ioe);
                        try {
    -                           final SerializedValue<Object> serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
    -                           result.put(entry.getKey(), serializedValue);
    -                   } catch (IOException ioe) {
    -                           LOG.error("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
    -
    -                           try {
    -                                   result.put(entry.getKey(), new 
SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
    -                           } catch (IOException e) {
    -                                   throw new RuntimeException("It should 
never happen that we cannot serialize the accumulator serialization 
exception.", e);
    -                           }
    +                           return new SerializedValue<>(new 
FailedAccumulator(ioe));
    --- End diff --
    
    Hmm, the problem I see here is that in the success case, we store the 
accumulator value and in the failure case we store an `Accumulator` instance. 
Thus, the user will expect the accumulator value and casting it accordingly. 
Thus he will never call the `Accumulator` methods which will throw the 
exceptions (see `JobExecutionResult` for how the user interacts with the 
accumulator values). In that sense the previous solution with storing a 
`FailedAccumulatorSerialization` was also flawed.
    
    What we actually would have to store in the `SerializedValue` is something 
like an `Either<Throwable, V>`. On the client side when accessing the 
`accumulatorsValueMap` it should check whether it is left or right and in the 
left case throw the exception.
    
    Alternatively, we say that an accumulator failure always results in a job 
failure. This means that in `JobMaster#jobStatusChanged` we generate a failed 
`ArchivedExecutionGraph` in case of an accumulator failure.


---

Reply via email to