Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/10835#discussion_r50742456
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
---
@@ -39,68 +37,144 @@ import org.apache.spark.util.Utils
*
* So, when adding new fields, take into consideration that the whole
object can be serialized for
* shipping off at any time to consumers of the SparkListener interface.
+ *
+ * @param initialAccums the initial set of accumulators that this
[[TaskMetrics]] depends on.
+ * Each accumulator in this initial set must be
uniquely named and marked
+ * as internal. Additional accumulators registered
later need not satisfy
+ * these requirements.
*/
@DeveloperApi
-class TaskMetrics extends Serializable {
+class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable
{
+
+ import InternalAccumulator._
+
+ // Needed for Java tests
+ def this() {
+ this(InternalAccumulator.create())
+ }
+
/**
- * Host's name the task runs on
+ * All accumulators registered with this task.
*/
- private var _hostname: String = _
- def hostname: String = _hostname
- private[spark] def setHostname(value: String) = _hostname = value
+ private val accums = new ArrayBuffer[Accumulable[_, _]]
+ accums ++= initialAccums
/**
- * Time taken on the executor to deserialize this task
+ * A map for quickly accessing the initial set of accumulators by name.
*/
- private var _executorDeserializeTime: Long = _
- def executorDeserializeTime: Long = _executorDeserializeTime
- private[spark] def setExecutorDeserializeTime(value: Long) =
_executorDeserializeTime = value
+ private val initialAccumsMap: Map[String, Accumulator[_]] = {
+ val map = new mutable.HashMap[String, Accumulator[_]]
+ initialAccums.foreach { a =>
+ assert(a.name.isDefined, "initial accumulators passed to TaskMetrics
must be named")
+ val name = a.name.get
+ assert(a.isInternal,
+ s"initial accumulator '$name' passed to TaskMetrics must be marked
as internal")
+ assert(!map.contains(name),
+ s"detected duplicate accumulator name '$name' when constructing
TaskMetrics")
+ map(name) = a
+ }
+ map.toMap
+ }
+ // Each metric is internally represented as an accumulator
+ private val _executorDeserializeTime =
getAccum(EXECUTOR_DESERIALIZE_TIME)
+ private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME)
+ private val _resultSize = getAccum(RESULT_SIZE)
+ private val _jvmGCTime = getAccum(JVM_GC_TIME)
+ private val _resultSerializationTime =
getAccum(RESULT_SERIALIZATION_TIME)
+ private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED)
+ private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED)
+ private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY)
+ private val _updatedBlockStatuses =
+ TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap,
UPDATED_BLOCK_STATUSES)
/**
- * Time the executor spends actually running the task (including
fetching shuffle data)
+ * Time taken on the executor to deserialize this task.
*/
- private var _executorRunTime: Long = _
- def executorRunTime: Long = _executorRunTime
- private[spark] def setExecutorRunTime(value: Long) = _executorRunTime =
value
+ def executorDeserializeTime: Long = _executorDeserializeTime.localValue
/**
- * The number of bytes this task transmitted back to the driver as the
TaskResult
+ * Time the executor spends actually running the task (including
fetching shuffle data).
*/
- private var _resultSize: Long = _
- def resultSize: Long = _resultSize
- private[spark] def setResultSize(value: Long) = _resultSize = value
+ def executorRunTime: Long = _executorRunTime.localValue
+ /**
+ * The number of bytes this task transmitted back to the driver as the
TaskResult.
+ */
+ def resultSize: Long = _resultSize.localValue
/**
- * Amount of time the JVM spent in garbage collection while executing
this task
+ * Amount of time the JVM spent in garbage collection while executing
this task.
*/
- private var _jvmGCTime: Long = _
- def jvmGCTime: Long = _jvmGCTime
- private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
+ def jvmGCTime: Long = _jvmGCTime.localValue
/**
- * Amount of time spent serializing the task result
+ * Amount of time spent serializing the task result.
*/
- private var _resultSerializationTime: Long = _
- def resultSerializationTime: Long = _resultSerializationTime
- private[spark] def setResultSerializationTime(value: Long) =
_resultSerializationTime = value
+ def resultSerializationTime: Long = _resultSerializationTime.localValue
/**
- * The number of in-memory bytes spilled by this task
+ * The number of in-memory bytes spilled by this task.
*/
- private var _memoryBytesSpilled: Long = _
- def memoryBytesSpilled: Long = _memoryBytesSpilled
- private[spark] def incMemoryBytesSpilled(value: Long): Unit =
_memoryBytesSpilled += value
- private[spark] def decMemoryBytesSpilled(value: Long): Unit =
_memoryBytesSpilled -= value
+ def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue
/**
- * The number of on-disk bytes spilled by this task
+ * The number of on-disk bytes spilled by this task.
*/
- private var _diskBytesSpilled: Long = _
- def diskBytesSpilled: Long = _diskBytesSpilled
- private[spark] def incDiskBytesSpilled(value: Long): Unit =
_diskBytesSpilled += value
- private[spark] def decDiskBytesSpilled(value: Long): Unit =
_diskBytesSpilled -= value
+ def diskBytesSpilled: Long = _diskBytesSpilled.localValue
+
+ /**
+ * Peak memory used by internal data structures created during shuffles,
aggregations and
+ * joins. The value of this accumulator should be approximately the sum
of the peak sizes
+ * across all such data structures created in this task. For SQL jobs,
this only tracks all
+ * unsafe operators and ExternalSort.
+ */
+ def peakExecutionMemory: Long = _peakExecutionMemory.localValue
+
+ /**
+ * Storage statuses of any blocks that have been updated as a result of
this task.
+ */
+ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] =
_updatedBlockStatuses.localValue
+
+ @deprecated("use updatedBlockStatuses instead", "2.0.0")
+ def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
+ if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
+ }
+
+ // Setters and increment-ers
+ private[spark] def setExecutorDeserializeTime(v: Long): Unit =
+ _executorDeserializeTime.setValue(v)
+ private[spark] def setExecutorRunTime(v: Long): Unit =
_executorRunTime.setValue(v)
+ private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
+ private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
+ private[spark] def setResultSerializationTime(v: Long): Unit =
+ _resultSerializationTime.setValue(v)
+ private[spark] def incMemoryBytesSpilled(v: Long): Unit =
_memoryBytesSpilled.add(v)
+ private[spark] def incDiskBytesSpilled(v: Long): Unit =
_diskBytesSpilled.add(v)
+ private[spark] def incPeakExecutionMemory(v: Long): Unit =
_peakExecutionMemory.add(v)
+ private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId,
BlockStatus)]): Unit =
+ _updatedBlockStatuses.add(v)
+ private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId,
BlockStatus)]): Unit =
+ _updatedBlockStatuses.setValue(v)
+
+ /**
+ * Host's name the task runs on.
+ */
+ @deprecated("hostname will be removed from TaskMetrics in the future",
"2.0.0")
+ def hostname: String = Utils.localHostName()
--- End diff --
ok, I'll just kill it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]