Hello All, I want to know more about data exchange between Spark workers in standalone mode. Every time a task wants to read result of another task, I want to log that event.
Information I need: source task / stage destination task / stage size of the data transfer So far I've managed to do something similar by changing two methods in Spark Core: In order to get which task produced which partition / block, I added SortShuffleWriter.sala#SortShuffleWriter#write: logError(s"""PRODUCED SORT: |BlockId: ${blockId.shuffleId} ${blockId.mapId} |PartitionId: ${context.partitionId()} |TaskAttemptId: ${context.taskAttemptId()} |StageId: ${context.stageId()} """.stripMargin) To get which task consumed which partition / block, I added to ShuffleBlockFetcherIterator.scala#ShuffleBlockFetcherIterator#sendRequest blockIds.foreach{ blockId => logError( s"""CONSUMED: |BlockId: ${blockId}, |PartitionId: ${context.partitionId()}, |TaskAttemptId: ${context.taskAttemptId()} |StageId: ${context.stageId()}, |Address: ${address} |Size: ${sizeMap(blockId)} """.stripMargin) } Using these two changes, I managed to partially reconstruct the communication graph, but there are a couple of problems: 1. I cannot map all PRODUCED/CONSUMED logs 2. The amount of data (filed "size") does not match real traffic numbers that I got from the OS. On the other hand, it matches the numbers for Shuffle Read/Write on Spark History Server. I've found an article that explains data exchange in Apache Flink to a certain extent. Is there something similar for Spark? https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks Thanks. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Core-How-do-Spark-workers-exchange-data-in-standalone-mode-tp21087.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org