Hi Imran, Thank you for your email.
In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I have found it to be true, although I have not included the t_{wait_for_read} in this, since it is---so far as I can tell---been either zero or negligible compared to the task time. Thanks, Mike On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote: > Hi Mike, > > all good questions, let me take a stab at answering them: > > 1. Event Logs + Stages: > > Its normal for stages to get skipped if they are shuffle map stages, which > get read multiple times. Eg., here's a little example program I wrote > earlier to demonstrate this: "d3" doesn't need to be re-shuffled since each > time its read w/ the same partitioner. So skipping stages in this way is a > good thing: > > val partitioner = new org.apache.spark.HashPartitioner(10) > val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> > x}.partitionBy(partitioner) > (0 until 5).foreach { idx => > val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> > x}.partitionBy(partitioner) > println(idx + " ---> " + otherData.join(d3).count()) > } > > If you run this, f you look in the UI you'd see that all jobs except for > the first one have one stage that is skipped. You will also see this in > the log: > > 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12, > Stage 13) > > 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13) > > Admittedly that is not very clear, but that is sort of indicating to you > that the DAGScheduler first created stage 12 as a necessary step, and then > later on changed its mind by realizing that everything it needed for stage > 12 already existed, so there was nothing to do. > > > 2. Extracting Event Log Information > > maybe you are interested in SparkListener ? Though unfortunately, I don't > know of a good blog post describing it, hopefully the docs are clear ... > > 3. Time Metrics in Spark Event Log > > This is a great question. I *think* the only exception is that t_gc is > really overlapped with t_exec. So I think you should really expect > > (t2 - t1) < (t_ser + t_deser + t_exec) > > I am not 100% sure about this, though. I'd be curious if that was > constraint was ever violated. > > > As for your question on shuffle read vs. shuffle write time -- I wouldn't > necessarily expect the same stage to have times for both shuffle read & > shuffle write -- in the simplest case, you'll have shuffle write times in > one, and shuffle read times in the next one. But even taking that into > account, there is a difference in the way they work & are measured. > shuffle read operations are pipelined and the way we measure shuffle read, > its just how much time is spent *waiting* for network transfer. It could > be that there is no (measurable) wait time b/c the next blocks are fetched > before they are needed. Shuffle writes occur in the normal task execution > thread, though, so we (try to) measure all of it. > > > On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Hi Patrick and Akhil, >> >> Thank you both for your responses. This is a bit of an extended email, >> but I'd like to: >> 1. Answer your (Patrick) note about the "missing" stages since the IDs >> do (briefly) appear in the event logs >> 2. Ask for advice/experience with extracting information from the >> event logs in a columnar, delimiter-separated format. >> 3. Ask about the time metrics reported in the event logs; currently, >> the elapsed time for a task does not equal the sum of the times for >> its components >> >> 1. Event Logs + Stages: >> ========================= >> >> As I said before, In the spark logs (the log4j configurable ones from >> the driver), I only see references to some stages, where the stage IDs >> are not arithmetically increasing. In the event logs, however, I will >> see reference to *every* stage, although not all stages will have >> tasks associated with them. >> >> For instance, to examine the actual stages that have tasks, you can >> see missing stages: >> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \ >> # | grep -Eo '"Stage ID":[[:digit:]]+' \ >> # | sort -n|uniq | head -n 5 >> "Stage ID":0 >> "Stage ID":1 >> "Stage ID":10 >> "Stage ID":11 >> "Stage ID":110 >> >> However, these "missing" stages *do* appear in the event logs as Stage >> IDs in the jobs submitted, i.e: for >> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo 'Stage >> IDs":\[.*\]' | head -n 5 >> Stage IDs":[0,1,2] >> Stage IDs":[5,3,4] >> Stage IDs":[6,7,8] >> Stage IDs":[9,10,11] >> Stage IDs":[12,13,14] >> >> I do not know if this amounts to a bug, since I am not familiar with >> the scheduler in detail. The stages have seemingly been created >> somewhere in the DAG, but then have no associated tasks and never >> appear again. >> >> 2. Extracting Event Log Information >> ==================================== >> Currently we are running scalability tests, and are finding very poor >> scalability for certain block matrix algorithms. I would like to have >> finer detail about the communication time and bandwidth when data is >> transferred between nodes. >> >> I would really just like to have a file with nothing but task info in >> a format such as: >> timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ... >> 0010294, 1, slave-1, 503, 34, ... >> 0010392, 2, slave-2, 543, 32, ... >> and similarly for jobs/stages/rdd_memory/shuffle output/etc. >> >> I have extracted the relevant time fields from the spark event logs >> with a sed script, but I wonder if there is an even more expedient >> way. Unfortunately, I do not immediately see how to do this using the >> $SPARK_HOME/conf/metrics.properties file and haven't come across a >> blog/etc that describes this. Could anyone please comment on whether >> or not a metrics configuation for this already exists? >> >> 3. Time Metrics in Spark Event Log >> ================================== >> I am confused about the times reported for tasks in the event log. >> There are launch and finish timestamps given for each task (call them >> t1 and t2, respectively), as well as GC time (t_gc), execution time >> (t_exec), and serialization times (t_ser, t_deser). However the times >> do not add up as I would have expected. I would imagine that the >> elapsed time t2 - t1 would be slightly larger than the sum of the >> component times. However, I can find many instances in the event logs >> where: >> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec) >> The difference can be 500 ms or more, which is not negligible for my >> current execution times of ~5000 ms. I have attached a plot that >> illustrates this. >> >> Regarding this, I'd like to ask: >> 1. How exactly are these times are being measured? >> 2. Should the sum of the component times equal the elapsed (clock) >> time for the task? >> 3. If not, which component(s) is(are) being excluded, and when do they >> occur? >> 4. There are occasionally reported measurements for Shuffle Write >> time, but not shuffle read time. Is there a method to determine the >> time required to shuffle data? Could this be done by look at delays >> between the first task in a new stage and the last task in the >> previous stage? >> >> Thank you very much for your time, >> Mike >> >> >> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote: >> > Hey Mike, >> > >> > Stage ID's are not guaranteed to be sequential because of the way the >> > DAG scheduler works (only increasing). In some cases stage ID numbers >> > are skipped when stages are generated. >> > >> > Any stage/ID that appears in the Spark UI is an actual stage, so if >> > you see ID's in there, but they are not in the logs, then let us know >> > (that would be a bug). >> > >> > - Patrick >> > >> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das <ak...@sigmoidanalytics.com> >> > wrote: >> >> Are you seeing the same behavior on the driver UI? (that running on >> >> port >> >> 4040), If you click on the stage id header you can sort the stages >> >> based >> >> on >> >> IDs. >> >> >> >> Thanks >> >> Best Regards >> >> >> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com> wrote: >> >>> >> >>> Hi folks, >> >>> >> >>> When I look at the output logs for an iterative Spark program, I see >> >>> that the stage IDs are not arithmetically numbered---that is, there >> >>> are gaps between stages and I might find log information about Stage >> >>> 0, 1,2, 5, but not 3 or 4. >> >>> >> >>> As an example, the output from the Spark logs below shows what I >> >>> mean: >> >>> >> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr | grep finished >> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at >> >>> blockMap.scala:1444) >> >>> finished in 7.820 s: >> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished >> >>> in 3.874 s: >> >>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179) >> >>> finished in 2.237 s: >> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished >> >>> in 1.749 s: >> >>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180) >> >>> finished in 1.082 s: >> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished >> >>> in 2.078 s: >> >>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188) >> >>> finished in 1.317 s: >> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) >> >>> finished >> >>> in 1.638 s: >> >>> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189) >> >>> finished in 0.732 s: >> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302) >> >>> finished in 0.192 s: >> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302) >> >>> finished in 0.170 s: >> >>> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201) >> >>> finished in 0.270 s: >> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) >> >>> finished >> >>> in 0.455 s: >> >>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274) >> >>> finished in 0.928 s: >> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) >> >>> finished >> >>> in 0.305 s: >> >>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275) >> >>> finished in 0.391 s: >> >>> 30452:INFO:DAGScheduler:Stage 32 (first at >> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s: >> >>> 30506:INFO:DAGScheduler:Stage 36 (first at >> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s: >> >>> >> >>> Can anyone comment on this being normal behavior? Is it indicative of >> >>> faults causing stages to be resubmitted? I also cannot find the >> >>> missing stages in any stage's parent List(Stage x, Stage y, ...) >> >>> >> >>> Thanks, >> >>> Mike >> >>> >> >>> >> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote: >> >>> > Thanks, René. I actually added a warning to the new JDBC >> reader/writer >> >>> > interface for 1.4.0. >> >>> > >> >>> > Even with that, I think we should support throttling JDBC; >> >>> > otherwise >> >>> > it's >> >>> > too convenient for our users to DOS their production database >> servers! >> >>> > >> >>> > >> >>> > /** >> >>> > * Construct a [[DataFrame]] representing the database table >> >>> > accessible >> >>> > via JDBC URL >> >>> > * url named table. Partitions of the table will be retrieved in >> >>> > parallel >> >>> > based on the parameters >> >>> > * passed to this function. >> >>> > * >> >>> > * * Don't create too many partitions in parallel on a large >> cluster; >> >>> > otherwise Spark might crash* >> >>> > * * your external database systems.* >> >>> > * >> >>> > * @param url JDBC database url of the form >> >>> > `jdbc:subprotocol:subname` >> >>> > * @param table Name of the table in the external database. >> >>> > * @param columnName the name of a column of integral type that >> will >> >>> > be >> >>> > used for partitioning. >> >>> > * @param lowerBound the minimum value of `columnName` used to >> >>> > decide >> >>> > partition stride >> >>> > * @param upperBound the maximum value of `columnName` used to >> >>> > decide >> >>> > partition stride >> >>> > * @param numPartitions the number of partitions. the range >> >>> > `minValue`-`maxValue` will be split >> >>> > * evenly into this many partitions >> >>> > * @param connectionProperties JDBC database connection >> >>> > arguments, >> a >> >>> > list >> >>> > of arbitrary string >> >>> > * tag/value. Normally at least a >> "user" >> >>> > and >> >>> > "password" property >> >>> > * should be included. >> >>> > * >> >>> > * @since 1.4.0 >> >>> > */ >> >>> > >> >>> > >> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rtref...@gmail.com> >> >>> > wrote: >> >>> > >> >>> >> Hi, >> >>> >> >> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ => >> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is >> an >> >>> >> Array[String] of 32 to 48 elements). (The code is tailored to >> >>> >> your >> >>> >> db, >> >>> >> specifically through the where conditions, I'd have otherwise post >> >>> >> it) >> >>> >> That should be the DataFrame API, but I'm just trying to load >> >>> >> everything >> >>> >> and discard it as soon as possible :-) >> >>> >> >> >>> >> (1) Never do a silent drop of the values by default: it kills >> >>> >> confidence. >> >>> >> An option sounds reasonable. Some sort of insight / log would be >> >>> >> great. >> >>> >> (How many columns of what type were truncated? why?) >> >>> >> Note that I could declare the field as string via JdbcDialects >> (thank >> >>> >> you >> >>> >> guys for merging that :-) ). >> >>> >> I have quite bad experiences with silent drops / truncates of >> columns >> >>> >> and >> >>> >> thus _like_ the strict way of spark. It causes trouble but >> >>> >> noticing >> >>> >> later >> >>> >> that your data was corrupted during conversion is even worse. >> >>> >> >> >>> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004 >> >>> >> >> >>> >> (3) One option would be to make it safe to use, the other option >> >>> >> would >> >>> >> be >> >>> >> to document the behavior (s.th. like "WARNING: this method tries >> >>> >> to >> >>> >> load >> >>> >> as many partitions as possible, make sure your database can handle >> >>> >> the >> >>> >> load >> >>> >> or load them in chunks and use union"). SPARK-8008 >> >>> >> https://issues.apache.org/jira/browse/SPARK-8008 >> >>> >> >> >>> >> Regards, >> >>> >> Rene Treffer >> >>> >> >> >>> > >> >>> >> >>> >> >>> -- >> >>> Thanks, >> >>> Mike >> >>> >> >>> --------------------------------------------------------------------- >> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> >>> For additional commands, e-mail: dev-h...@spark.apache.org >> >>> >> >> >> > >> >> >> -- >> Thanks, >> Mike >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> For additional commands, e-mail: dev-h...@spark.apache.org >> > -- Thanks, Mike --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org