[ 
https://issues.apache.org/jira/browse/FLINK-2645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14737280#comment-14737280
 ] 

ASF GitHub Bot commented on FLINK-2645:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1112#discussion_r39073704
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -922,19 +922,18 @@ public boolean updateState(TaskExecutionState state) {
                                case RUNNING:
                                        return attempt.switchToRunning();
                                case FINISHED:
    -                                   Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators = null;
    -                                   Map<String, Accumulator<?, ?>> 
userAccumulators = null;
                                        try {
                                                AccumulatorSnapshot 
accumulators = state.getAccumulators();
    -                                           flinkAccumulators = 
accumulators.deserializeFlinkAccumulators();
    -                                           userAccumulators = 
accumulators.deserializeUserAccumulators(userClassLoader);
    +                                           Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators =
    +                                                   
accumulators.deserializeFlinkAccumulators();
    +                                           Map<String, Accumulator<?, ?>> 
userAccumulators =
    +                                                   
accumulators.deserializeUserAccumulators(userClassLoader);
    +                                           
attempt.markFinished(flinkAccumulators, userAccumulators);
                                        }
                                        catch (Exception e) {
    -                                           // we do not fail the job on 
deserialization problems of accumulators, but only log
                                                LOG.error("Failed to 
deserialize final accumulator results.", e);
    +                                           attempt.markFailed(new 
SerializedThrowable(e));
    --- End diff --
    
    I don't think you need a SerializedThrowable here. The exceptions are 
wrapped in serialized throwables when they are transported via actor messages.


> Accumulator exceptions are not properly forwarded
> -------------------------------------------------
>
>                 Key: FLINK-2645
>                 URL: https://issues.apache.org/jira/browse/FLINK-2645
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.10
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 0.10
>
>
> Exceptions in accumulator implementations are not properly forwarded from the 
> JobManager to the Client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to