LuciferYang commented on code in PR #50474:
URL: https://github.com/apache/spark/pull/50474#discussion_r2024025449


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -1014,39 +1014,44 @@ final class ShuffleBlockFetcherIterator(
           // a SuccessFetchResult or a FailureFetchResult.
           result = null
 
-          case PushMergedLocalMetaFetchResult(
-            shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) =>
-            // Fetch push-merged-local shuffle block data as multiple shuffle 
chunks
-            val shuffleBlockId = ShuffleMergedBlockId(shuffleId, 
shuffleMergeId, reduceId)
-            try {
-              val bufs: Seq[ManagedBuffer] = 
blockManager.getLocalMergedBlockData(shuffleBlockId,
-                localDirs)
-              // Since the request for local block meta completed 
successfully, numBlocksToFetch
-              // is decremented.
-              numBlocksToFetch -= 1
-              // Update total number of blocks to fetch, reflecting the 
multiple local shuffle
-              // chunks.
-              numBlocksToFetch += bufs.size
-              bufs.zipWithIndex.foreach { case (buf, chunkId) =>
-                buf.retain()
-                val shuffleChunkId = ShuffleBlockChunkId(shuffleId, 
shuffleMergeId, reduceId,
-                  chunkId)
-                pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
-                results.put(SuccessFetchResult(shuffleChunkId, 
SHUFFLE_PUSH_MAP_ID,
-                  pushBasedFetchHelper.localShuffleMergerBlockMgrId, 
buf.size(), buf,
-                  isNetworkReqDone = false))
-              }
-            } catch {
-              case e: Exception =>
-                // If we see an exception with reading push-merged-local index 
file, we fallback
-                // to fetch the original blocks. We do not report block fetch 
failure
-                // and will continue with the remaining local block read.
-                logWarning("Error occurred while reading push-merged-local 
index, " +
-                  "prepare to fetch the original blocks", e)
-                pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
-                  shuffleBlockId, 
pushBasedFetchHelper.localShuffleMergerBlockMgrId)
+        case PushMergedLocalMetaFetchResult(

Review Comment:
   1. Mima check failure indicates that binary forward compatibility has been 
broken. I think we should not compromise it for the sake of a single 
compilation parameter. 
   2. Additionally, please refrain from modifying the code formatting for 
sections that have not been actually revised. Our principle is to keep the 
scope of changes in a patch as minimal as possible. Excluding modules 
`sql/api`, `sql/connect/common`, `sql/connect/server`, and 
`sql/connect/client/jvm`, do not use the scalafmt tool to format the code.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to