Hi all, I have written a program and overridden two events onStageCompleted and onTaskEnd. However, these two events do not provide information on when a Task/Stage is completed.
What I want to know is which Task corresponds to which stage of a DAG (the Spark history server only tells me how many stages a Job has and how many Jobs a Stage has). Can I print out the edges of the Tasks according to the DAGScheduler? Below is the program I have written: import org.apache.spark.rdd.RDD import org.apache.spark.TaskContext import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext, TaskEndReason} import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate, SparkListenerStageCompleted, SparkListenerTaskEnd} import scala.collection.mutable import org.apache.spark.sql.execution.SparkPlan class CustomListener extends SparkListener { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val rdds = stageCompleted.stageInfo.rddInfos val stageInfo = stageCompleted.stageInfo println(s"Stage ${stageInfo.stageId}") println(s"Number of tasks: ${stageInfo.numTasks}") stageInfo.rddInfos.foreach { rddInfo => println(s"RDD ${rddInfo.id} has ${rddInfo.numPartitions} partitions.") } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val stageId = taskEnd.stageId val stageAttemptId = taskEnd.stageAttemptId val taskInfo = taskEnd.taskInfo println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration: ${taskInfo.duration} ms.") } def wordCount(sc: SparkContext, inputPath: String): Unit = { val data = sc.textFile(inputPath) val flatMap = data.flatMap(line => line.split(",")) val map = flatMap.map(word => (word, 1)) val reduceByKey = map.reduceByKey(_ + _) reduceByKey.foreach(println) } } object Scenario1 { def main(args: Array[String]): Unit = { val appName = "scenario1" val spark = SparkSession.builder() .master("local[*]") .appName(appName) .getOrCreate() val sc = spark.sparkContext val sparkListener = new CustomListener() sc.addSparkListener(sparkListener) val inputPath = "s3a://data-join/file00" sparkListener.wordCount(sc, inputPath) sc.stop() } } Best regards, Truong Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An < truong...@vlute.edu.vn> đã viết: > Dear Jacek Laskowski, > > Thank you for your guide. I will try it out for my problem. > > Best regards, > Truong > > > Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski <ja...@japila.pl> > đã viết: > >> Hi, >> >> Start with intercepting stage completions >> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and >> tasks). >> >> Go up the execution chain to Spark SQL >> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd >> [3], and correlate infos. >> >> You may want to look at how web UI works under the covers to collect all >> the information. Start from SQLTab that should give you what is displayed >> (that should give you then what's needed and how it's collected). >> >> [1] >> https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46 >> [2] >> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44 >> [3] >> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60 >> [4] >> https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24 >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An < >> truong...@vlute.edu.vn> wrote: >> >>> Hi, >>> >>> Can you give me more details or give me a tutorial on "You'd have to >>> intercept execution events and correlate them. Not an easy task yet doable" >>> >>> Thank >>> >>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski < >>> ja...@japila.pl> đã viết: >>> >>>> Hi, >>>> >>>> tl;dr it's not possible to "reverse-engineer" tasks to functions. >>>> >>>> In essence, Spark SQL is an abstraction layer over RDD API that's made >>>> up of partitions and tasks. Tasks are Scala functions (possibly with some >>>> Python for PySpark). A simple-looking high-level operator like >>>> DataFrame.join can end up with multiple RDDs, each with a set of partitions >>>> (and hence tasks). What the tasks do is an implementation detail that you'd >>>> have to know about by reading the source code of Spark SQL that produces >>>> the "bytecode". >>>> >>>> Just looking at the DAG or the tasks screenshots won't give you that >>>> level of detail. You'd have to intercept execution events and correlate >>>> them. Not an easy task yet doable. HTH. >>>> >>>> Pozdrawiam, >>>> Jacek Laskowski >>>> ---- >>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>> Follow me on https://twitter.com/jaceklaskowski >>>> >>>> <https://twitter.com/jaceklaskowski> >>>> >>>> >>>> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An < >>>> truong...@vlute.edu.vn> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I am conducting a study comparing the execution time of Bloom Filter >>>>> Join operation on two environments: Apache Spark Cluster and Apache Spark. >>>>> I have compared the overall time of the two environments, but I want to >>>>> compare specific "tasks on each stage" to see which computation has the >>>>> most significant difference. >>>>> >>>>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks >>>>> executed in Stage 0. >>>>> - DAG.png >>>>> - Task.png >>>>> >>>>> *I have questions:* >>>>> 1. Can we determine which tasks are responsible for executing each >>>>> step scheduled on the DAG during the processing? >>>>> 2. Is it possible to know the function of each task (e.g., what is >>>>> task ID 0 responsible for? What is task ID 1 responsible for? ... )? >>>>> >>>>> Best regards, >>>>> Truong >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>>