LuciferYang commented on code in PR #50474: URL: https://github.com/apache/spark/pull/50474#discussion_r2041365069
########## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ########## @@ -1014,39 +1014,44 @@ final class ShuffleBlockFetcherIterator( // a SuccessFetchResult or a FailureFetchResult. result = null - case PushMergedLocalMetaFetchResult( - shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) => - // Fetch push-merged-local shuffle block data as multiple shuffle chunks - val shuffleBlockId = ShuffleMergedBlockId(shuffleId, shuffleMergeId, reduceId) - try { - val bufs: Seq[ManagedBuffer] = blockManager.getLocalMergedBlockData(shuffleBlockId, - localDirs) - // Since the request for local block meta completed successfully, numBlocksToFetch - // is decremented. - numBlocksToFetch -= 1 - // Update total number of blocks to fetch, reflecting the multiple local shuffle - // chunks. - numBlocksToFetch += bufs.size - bufs.zipWithIndex.foreach { case (buf, chunkId) => - buf.retain() - val shuffleChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, reduceId, - chunkId) - pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId)) - results.put(SuccessFetchResult(shuffleChunkId, SHUFFLE_PUSH_MAP_ID, - pushBasedFetchHelper.localShuffleMergerBlockMgrId, buf.size(), buf, - isNetworkReqDone = false)) - } - } catch { - case e: Exception => - // If we see an exception with reading push-merged-local index file, we fallback - // to fetch the original blocks. We do not report block fetch failure - // and will continue with the remaining local block read. - logWarning("Error occurred while reading push-merged-local index, " + - "prepare to fetch the original blocks", e) - pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock( - shuffleBlockId, pushBasedFetchHelper.localShuffleMergerBlockMgrId) + case PushMergedLocalMetaFetchResult( Review Comment: 1. The code for Spark 4.0 was frozen a long time ago, and this pull request does not comply with the backport rules. 2. I haven't continued reviewing it or attempting to merge it recently to avoid unnecessary code conflicts before the release of version 4.0. 3. I still believe that before there is a clear Scala 3 upgrade plan, we should not introduce binary incompatibility due to compilation options or syntax changes. Please note that I am not opposed to upgrading to Scala 3, but there should be a well-defined plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org