Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/2803#discussion_r18857966
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -18,131 +18,55 @@
package org.apache.spark;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import scala.Function0;
import scala.Function1;
import scala.Unit;
-import scala.collection.JavaConversions;
import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.util.TaskCompletionListener;
-import org.apache.spark.util.TaskCompletionListenerException;
/**
-* :: DeveloperApi ::
-* Contextual information about a task which can be read or mutated during
execution.
-*/
-@DeveloperApi
-public class TaskContext implements Serializable {
-
- private int stageId;
- private int partitionId;
- private long attemptId;
- private boolean runningLocally;
- private TaskMetrics taskMetrics;
-
- /**
- * :: DeveloperApi ::
- * Contextual information about a task which can be read or mutated
during execution.
- *
- * @param stageId stage id
- * @param partitionId index of the partition
- * @param attemptId the number of attempts to execute this task
- * @param runningLocally whether the task is running locally in the
driver JVM
- * @param taskMetrics performance metrics of the task
- */
- @DeveloperApi
- public TaskContext(int stageId, int partitionId, long attemptId, boolean
runningLocally,
- TaskMetrics taskMetrics) {
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.runningLocally = runningLocally;
- this.stageId = stageId;
- this.taskMetrics = taskMetrics;
- }
-
- /**
- * :: DeveloperApi ::
- * Contextual information about a task which can be read or mutated
during execution.
- *
- * @param stageId stage id
- * @param partitionId index of the partition
- * @param attemptId the number of attempts to execute this task
- * @param runningLocally whether the task is running locally in the
driver JVM
- */
- @DeveloperApi
- public TaskContext(int stageId, int partitionId, long attemptId, boolean
runningLocally) {
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.runningLocally = runningLocally;
- this.stageId = stageId;
- this.taskMetrics = TaskMetrics.empty();
- }
-
+ * Contextual information about a task which can be read or mutated during
+ * execution. To access the TaskContext for a running task use
+ * TaskContext.get().
+ */
+public abstract class TaskContext implements Serializable {
/**
- * :: DeveloperApi ::
- * Contextual information about a task which can be read or mutated
during execution.
- *
- * @param stageId stage id
- * @param partitionId index of the partition
- * @param attemptId the number of attempts to execute this task
+ * Return the currently active TaskContext. This can be called inside of
+ * user functions to access contextual information about running tasks.
*/
- @DeveloperApi
- public TaskContext(int stageId, int partitionId, long attemptId) {
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.runningLocally = false;
- this.stageId = stageId;
- this.taskMetrics = TaskMetrics.empty();
+ public static TaskContext get() {
+ return taskContext.get();
}
private static ThreadLocal<TaskContext> taskContext =
new ThreadLocal<TaskContext>();
- /**
- * :: Internal API ::
- * This is spark internal API, not intended to be called from user
programs.
- */
- public static void setTaskContext(TaskContext tc) {
+ static void setTaskContext(TaskContext tc) {
taskContext.set(tc);
}
- public static TaskContext get() {
- return taskContext.get();
- }
-
- /** :: Internal API :: */
- public static void unset() {
+ static void unset() {
taskContext.remove();
}
- // List of callback functions to execute when the task completes.
- private transient List<TaskCompletionListener> onCompleteCallbacks =
- new ArrayList<TaskCompletionListener>();
-
- // Whether the corresponding task has been killed.
- private volatile boolean interrupted = false;
-
- // Whether the task has completed.
- private volatile boolean completed = false;
-
/**
- * Checks whether the task has completed.
+ * Whether the task has completed.
*/
- public boolean isCompleted() {
- return completed;
- }
+ public abstract boolean isCompleted();
/**
- * Checks whether the task has been killed.
+ * Whether the task has been killed.
*/
- public boolean isInterrupted() {
- return interrupted;
- }
+ public abstract boolean isInterrupted();
+
+ @Deprecated
--- End diff --
super nit: add annotation after javadoc.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]