By looking into the code of ShuffleBlockFetcherIterator, seems it will load all shuffle blocks needed by one task at one time and release all of them when task finished.
Please correct me if I'm wrong. If my understanding is correct, does it mean those shuffle blocks will keep in memory even though the reading of some of the blocks has been finished? Is it possible to enhance this part? I do see TODO in the code comment, not sure if it means the same thing. Here pasted the code: val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) try { // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. Utils.copyStream(input, out) out.close() input = out.toChunkedByteBuffer.toInputStream(dispose = true) } catch { case e: IOException => buf.release() if (buf.isInstanceOf[FileSegmentManagedBuffer] || corruptedBlocks.contains(blockId)) { throwFetchFailedException(blockId, address, e) } else { logWarning(s"got an corrupted block $blockId from $address, fetch again", e) corruptedBlocks += blockId fetchRequests += FetchRequest(address, Array((blockId, size))) result = null } } finally { // TODO: release the buf here to free memory earlier originalInput.close() in.close() } Also, if the shuffle blocks are kept in memory for a long time, it's also possible to goto oldgen and then has impact on full-gc. Chrysan Wu Phone:+86 17717640807