attilapiros opened a new pull request, #50122:
URL: https://github.com/apache/spark/pull/50122

   Thanks for @yorksity who reported this error and even provided a PR for it. 
   This solution very different from https://github.com/apache/spark/pull/40883 
as `BlockManagerMasterEndpoint#getLocationsAndStatus()` needed some refactoring.
   
   ### What changes were proposed in this pull request?
   
   This PR fixes an error which can be manifested in the following exception:
   
   ```
   25/02/20 09:58:31 ERROR util.Utils: [Executor task launch worker for task 
61.0 in stage 67.0 (TID 9391)]: Exception encountered
   java.lang.ArrayIndexOutOfBoundsException: 0
     at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:185)
 ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2]
     at 
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) 
~[scala-library-2.12.15.jar:?]
     at scala.collection.immutable.List.foreach(List.scala:431) 
~[scala-library-2.12.15.jar:?]
     at 
org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:171)
 ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2]                   
             
   ```
   
   The PR is changing `BlockManagerMasterEndpoint#getLocationsAndStatus()`.
   
   The `BlockManagerMasterEndpoint#getLocationsAndStatus()` function is giving 
back an optional `BlockLocationsAndStatus` which consist of 3 parts:
    - `locations`: all the locations where the block can be found (as a 
sequence of block manager IDs)
    - `status`: one block status
    - `localDirs`: optional directory paths which can be used to read block if 
the block is found in the disk of an executor running on the same host
    
   The block (either RDD blocks, shuffle blocks or torrent blocks) can be 
stored in many executors with different storage levels: disk or memory.
   
   This PR changing how the block status and the block manager ID for the 
`localDirs` is found to guarantee they belong together.
   
   ### Why are the changes needed?
   
   Before this PR the `BlockManagerMasterEndpoint#getLocationsAndStatus()` was 
searching for the block status (`status`) and the `localDirs` separately. The 
block status actually was computed as the very first one where the block can be 
found. This way it can easily happen this block status was representing an 
in-memory block (where the disk size is 0 as it is stored in the memory) but 
the `localDirs` was filled out based on a host local block instance which was 
stored on disk.
   
   This situation can be very frequent but only causing the above exception 
when encryption is on (spark.io.encryption.enabled=true) as for a not encrypted 
block the whole file containing the block is read, see
   
https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   A unit test is provided. 
   
   I had to play with the number of block mangers and the order of the blocks 
as the block status order is depends on a `HashSet`, see:
   ```
     private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
   ```
   
   It was tested with the old code too:
   ```
   BlockManagerSuite:
   OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader 
classes because bootstrap classpath has been appended
   - SPARK-43221: Host local block fetching should use a block status with disk 
size *** FAILED ***
     0 was not greater than 0 The block size must be greater than 0 for a 
nonempty block! (BlockManagerSuite.scala:491)
   Run completed in 6 seconds, 705 milliseconds.
   Total number of tests run: 1
   Suites: completed 1, aborted 0
   Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
   *** 1 TEST FAILED ***
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to