Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/934#discussion_r35635395
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -338,16 +332,51 @@ class JobManager(
                 // is the client waiting for the job result?
                 newJobStatus match {
                   case JobStatus.FINISHED =>
    -                val accumulatorResults: java.util.Map[String, 
SerializedValue[AnyRef]] = try {
    -                  executionGraph.getAccumulatorsSerialized
    +
    +                val jobConfig = currentJobs.getOrElse(jobID,
    +                  throw new RuntimeException("Unknown Job: " + 
jobID))._1.getJobConfiguration
    +
    +                val smallAccumulatorResults: java.util.Map[String, 
SerializedValue[AnyRef]] = try {
    +                  executionGraph.getSmallAccumulatorsContentSerialized
                     } catch {
                       case e: Exception =>
                         log.error(s"Cannot fetch serialized accumulators for 
job $jobID", e)
                         Collections.emptyMap()
                     }
    -                val result = new SerializedJobExecutionResult(jobID, 
jobInfo.duration,
    -                                                              
accumulatorResults)
    -                jobInfo.client ! decorateMessage(JobResultSuccess(result))
    +
    +                var largeAccumulatorResults: java.util.Map[String, 
java.util.List[BlobKey]] =
    +                  executionGraph.aggregateLargeUserAccumulatorBlobKeys()
    +
    +                /*
    +                * The following covers the case where partial accumulator 
results are small, but
    +                * when aggregated, they become big. In this case, this 
happens at the JobManager,
    +                * and this code is responsible for detecting it, storing 
the oversized result in
    +                * the BlobCache, and informing the Client accordingly.
    +                * */
    +                
    +                val totalSize: Long = 
smallAccumulatorResults.asScala.map(_._2.getSizeInBytes).sum
    +                if (totalSize > 
AkkaUtils.getLargeAccumulatorThreshold(jobConfig)) {
    +                  // given that the client is going to do the final 
merging, we serialize and
    +                  // store the accumulator objects, not only the content
    +                  val serializedSmallAccumulators = 
executionGraph.getSmallAccumulatorsSerialized
    +
    +                  // store the accumulators in the blobCache and get the 
keys.
    +                  val newBlobKeys = 
LargeAccumulatorHelper.storeSerializedAccumulatorsToBlobCache(
    +                    getBlobCacheServerAddress, serializedSmallAccumulators)
    +                  smallAccumulatorResults.clear()
    +
    +                  // and update the blobKeys to send to the client.
    +                  largeAccumulatorResults = executionGraph.
    +                    
addLargeUserAccumulatorBlobKeys(largeAccumulatorResults, newBlobKeys)
    +
    +                } else {
    +                  // do nothing
    +                  java.util.Collections.emptyMap()
    +                }
    --- End diff --
    
    The else branch can probably be removed? You could also let the two if 
branches return the value directly to the largeAccumulatorResults variable. 
Then it could be a `val` instead of a `var`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to