joan38 commented on code in PR #50474:
URL: https://github.com/apache/spark/pull/50474#discussion_r2044980777
##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -1014,39 +1014,44 @@ final class ShuffleBlockFetcherIterator(
// a SuccessFetchResult or a FailureFetchResult.
result = null
- case PushMergedLocalMetaFetchResult(
- shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) =>
- // Fetch push-merged-local shuffle block data as multiple shuffle
chunks
- val shuffleBlockId = ShuffleMergedBlockId(shuffleId,
shuffleMergeId, reduceId)
- try {
- val bufs: Seq[ManagedBuffer] =
blockManager.getLocalMergedBlockData(shuffleBlockId,
- localDirs)
- // Since the request for local block meta completed
successfully, numBlocksToFetch
- // is decremented.
- numBlocksToFetch -= 1
- // Update total number of blocks to fetch, reflecting the
multiple local shuffle
- // chunks.
- numBlocksToFetch += bufs.size
- bufs.zipWithIndex.foreach { case (buf, chunkId) =>
- buf.retain()
- val shuffleChunkId = ShuffleBlockChunkId(shuffleId,
shuffleMergeId, reduceId,
- chunkId)
- pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
- results.put(SuccessFetchResult(shuffleChunkId,
SHUFFLE_PUSH_MAP_ID,
- pushBasedFetchHelper.localShuffleMergerBlockMgrId,
buf.size(), buf,
- isNetworkReqDone = false))
- }
- } catch {
- case e: Exception =>
- // If we see an exception with reading push-merged-local index
file, we fallback
- // to fetch the original blocks. We do not report block fetch
failure
- // and will continue with the remaining local block read.
- logWarning("Error occurred while reading push-merged-local
index, " +
- "prepare to fetch the original blocks", e)
- pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
- shuffleBlockId,
pushBasedFetchHelper.localShuffleMergerBlockMgrId)
+ case PushMergedLocalMetaFetchResult(
Review Comment:
Ok @LuciferYang. So I'll wait for you guys to process Spark 4 and then
you'll ping back on this PR?
In the meantime I'd recommend watching [this
talk](https://youtu.be/Qeavi9M65Qw) from @bishabosha especially [the DataFrame
demo](https://youtu.be/Qeavi9M65Qw?si=tQxtybnOB9P9GMvk&t=1145) and [the example
project](https://github.com/bishabosha/scalar-2025/blob/main/dataframe/test/demo.scala).
Thanks for the help!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]