Hi Imran,

Thank you for your email.

In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I
have found it to be true, although I have not included the
t_{wait_for_read} in this, since it is---so far as I can tell---been
either zero or negligible compared to the task time.

Thanks,
Mike

On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote:
> Hi Mike,
>
> all good questions, let me take a stab at answering them:
>
> 1. Event Logs + Stages:
>
> Its normal for stages to get skipped if they are shuffle map stages, which
> get read multiple times.  Eg., here's a little example program I wrote
> earlier to demonstrate this: "d3" doesn't need to be re-shuffled since each
> time its read w/ the same partitioner.  So skipping stages in this way is a
> good thing:
>
> val partitioner = new org.apache.spark.HashPartitioner(10)
> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
> x}.partitionBy(partitioner)
> (0 until 5).foreach { idx =>
>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) ->
> x}.partitionBy(partitioner)
>   println(idx + " ---> " + otherData.join(d3).count())
> }
>
> If you run this, f you look in the UI you'd see that all jobs except for
> the first one have one stage that is skipped.  You will also see this in
> the log:
>
> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12,
> Stage 13)
>
> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13)
>
> Admittedly that is not very clear, but that is sort of indicating to you
> that the DAGScheduler first created stage 12 as a necessary step, and then
> later on changed its mind by realizing that everything it needed for stage
> 12 already existed, so there was nothing to do.
>
>
> 2. Extracting Event Log Information
>
> maybe you are interested in SparkListener ? Though unfortunately, I don't
> know of a good blog post describing it, hopefully the docs are clear ...
>
> 3. Time Metrics in Spark Event Log
>
> This is a great question.  I *think* the only exception is that t_gc is
> really overlapped with t_exec.  So I think you should really expect
>
> (t2 - t1) < (t_ser + t_deser + t_exec)
>
> I am not 100% sure about this, though.  I'd be curious if that was
> constraint was ever violated.
>
>
> As for your question on shuffle read vs. shuffle write time -- I wouldn't
> necessarily expect the same stage to have times for both shuffle read &
> shuffle write -- in the simplest case, you'll have shuffle write times in
> one, and shuffle read times in the next one.  But even taking that into
> account, there is a difference in the way they work & are measured.
>  shuffle read operations are pipelined and the way we measure shuffle read,
> its just how much time is spent *waiting* for network transfer.  It could
> be that there is no (measurable) wait time b/c the next blocks are fetched
> before they are needed.  Shuffle writes occur in the normal task execution
> thread, though, so we (try to) measure all of it.
>
>
> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Hi Patrick and Akhil,
>>
>> Thank you both for your responses. This is a bit of an extended email,
>> but I'd like to:
>> 1. Answer your (Patrick) note about the "missing" stages since the IDs
>> do (briefly) appear in the event logs
>> 2. Ask for advice/experience with extracting information from the
>> event logs in a columnar, delimiter-separated format.
>> 3. Ask about the time metrics reported in the event logs; currently,
>> the elapsed time for a task does not equal the sum of the times for
>> its components
>>
>> 1. Event Logs + Stages:
>> =========================
>>
>> As I said before, In the spark logs (the log4j configurable ones from
>> the driver), I only see references to some stages, where the stage IDs
>> are not arithmetically increasing. In the event logs, however, I will
>> see reference to *every* stage, although not all stages will have
>> tasks associated with them.
>>
>> For instance, to examine the actual stages that have tasks, you can
>> see missing stages:
>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \
>> #               | grep -Eo '"Stage ID":[[:digit:]]+'  \
>> #               | sort -n|uniq | head -n 5
>> "Stage ID":0
>> "Stage ID":1
>> "Stage ID":10
>> "Stage ID":11
>> "Stage ID":110
>>
>> However, these "missing" stages *do* appear in the event logs as Stage
>> IDs in the jobs submitted, i.e: for
>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo 'Stage
>> IDs":\[.*\]' | head -n 5
>> Stage IDs":[0,1,2]
>> Stage IDs":[5,3,4]
>> Stage IDs":[6,7,8]
>> Stage IDs":[9,10,11]
>> Stage IDs":[12,13,14]
>>
>> I do not know if this amounts to a bug, since I am not familiar with
>> the scheduler in detail. The stages have seemingly been created
>> somewhere in the DAG, but then have no associated tasks and never
>> appear again.
>>
>> 2. Extracting Event Log Information
>> ====================================
>> Currently we are running scalability tests, and are finding very poor
>> scalability for certain block matrix algorithms. I would like to have
>> finer detail about the communication time and bandwidth when data is
>> transferred between nodes.
>>
>> I would really just like to have a file with nothing but task info in
>> a format such as:
>> timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ...
>> 0010294, 1, slave-1, 503, 34, ...
>> 0010392, 2, slave-2, 543, 32, ...
>> and similarly for jobs/stages/rdd_memory/shuffle output/etc.
>>
>> I have extracted the relevant time fields from the spark event logs
>> with a sed script, but I wonder if there is an even more expedient
>> way. Unfortunately, I do not immediately see how to do this using the
>> $SPARK_HOME/conf/metrics.properties file and haven't come across a
>> blog/etc that describes this. Could anyone please comment on whether
>> or not a metrics configuation for this already exists?
>>
>> 3. Time Metrics in Spark Event Log
>> ==================================
>> I am confused about the times reported for tasks in the event log.
>> There are launch and finish timestamps given for each task (call them
>> t1 and t2, respectively), as well as GC time (t_gc), execution time
>> (t_exec), and serialization times (t_ser, t_deser). However the times
>> do not add up as I would have expected. I would imagine that the
>> elapsed time t2 - t1 would be slightly larger than the sum of the
>> component times. However, I can find many instances in the event logs
>> where:
>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec)
>> The difference can be 500 ms or more, which is not negligible for my
>> current execution times of ~5000 ms. I have attached a plot that
>> illustrates this.
>>
>> Regarding this, I'd like to ask:
>> 1. How exactly are these times are being measured?
>> 2. Should the sum of the component times equal the elapsed (clock)
>> time for the task?
>> 3. If not, which component(s) is(are) being excluded, and when do they
>> occur?
>> 4. There are occasionally reported measurements for Shuffle Write
>> time, but not shuffle read time. Is there a method to determine the
>> time required to shuffle data? Could this be done by look at delays
>> between the first task in a new stage and the last task in the
>> previous stage?
>>
>> Thank you very much for your time,
>> Mike
>>
>>
>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote:
>> > Hey Mike,
>> >
>> > Stage ID's are not guaranteed to be sequential because of the way the
>> > DAG scheduler works (only increasing). In some cases stage ID numbers
>> > are skipped when stages are generated.
>> >
>> > Any stage/ID that appears in the Spark UI is an actual stage, so if
>> > you see ID's in there, but they are not in the logs, then let us know
>> > (that would be a bug).
>> >
>> > - Patrick
>> >
>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> > wrote:
>> >> Are you seeing the same behavior on the driver UI? (that running on
>> >> port
>> >> 4040), If you click on the stage id header you can sort the stages
>> >> based
>> >> on
>> >> IDs.
>> >>
>> >> Thanks
>> >> Best Regards
>> >>
>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com> wrote:
>> >>>
>> >>> Hi folks,
>> >>>
>> >>> When I look at the output logs for an iterative Spark program, I see
>> >>> that the stage IDs are not arithmetically numbered---that is, there
>> >>> are gaps between stages and I might find log information about Stage
>> >>> 0, 1,2, 5, but not 3 or 4.
>> >>>
>> >>> As an example, the output from the Spark logs below shows what I
>> >>> mean:
>> >>>
>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at
>> >>> blockMap.scala:1444)
>> >>> finished in 7.820 s:
>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished
>> >>> in 3.874 s:
>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
>> >>> finished in 2.237 s:
>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished
>> >>> in 1.749 s:
>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
>> >>> finished in 1.082 s:
>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished
>> >>> in 2.078 s:
>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
>> >>> finished in 1.317 s:
>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817)
>> >>> finished
>> >>> in 1.638 s:
>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
>> >>> finished in 0.732 s:
>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
>> >>> finished in 0.192 s:
>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
>> >>> finished in 0.170 s:
>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
>> >>> finished in 0.270 s:
>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355)
>> >>> finished
>> >>> in 0.455 s:
>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
>> >>> finished in 0.928 s:
>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355)
>> >>> finished
>> >>> in 0.305 s:
>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
>> >>> finished in 0.391 s:
>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at
>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at
>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>> >>>
>> >>> Can anyone comment on this being normal behavior? Is it indicative of
>> >>> faults causing stages to be resubmitted? I also cannot find the
>> >>> missing stages in any stage's parent List(Stage x, Stage y, ...)
>> >>>
>> >>> Thanks,
>> >>> Mike
>> >>>
>> >>>
>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote:
>> >>> > Thanks, René. I actually added a warning to the new JDBC
>> reader/writer
>> >>> > interface for 1.4.0.
>> >>> >
>> >>> > Even with that, I think we should support throttling JDBC;
>> >>> > otherwise
>> >>> > it's
>> >>> > too convenient for our users to DOS their production database
>> servers!
>> >>> >
>> >>> >
>> >>> >   /**
>> >>> >    * Construct a [[DataFrame]] representing the database table
>> >>> > accessible
>> >>> > via JDBC URL
>> >>> >    * url named table. Partitions of the table will be retrieved in
>> >>> > parallel
>> >>> > based on the parameters
>> >>> >    * passed to this function.
>> >>> >    *
>> >>> > *   * Don't create too many partitions in parallel on a large
>> cluster;
>> >>> > otherwise Spark might crash*
>> >>> > *   * your external database systems.*
>> >>> >    *
>> >>> >    * @param url JDBC database url of the form
>> >>> > `jdbc:subprotocol:subname`
>> >>> >    * @param table Name of the table in the external database.
>> >>> >    * @param columnName the name of a column of integral type that
>> will
>> >>> > be
>> >>> > used for partitioning.
>> >>> >    * @param lowerBound the minimum value of `columnName` used to
>> >>> > decide
>> >>> > partition stride
>> >>> >    * @param upperBound the maximum value of `columnName` used to
>> >>> > decide
>> >>> > partition stride
>> >>> >    * @param numPartitions the number of partitions.  the range
>> >>> > `minValue`-`maxValue` will be split
>> >>> >    *                      evenly into this many partitions
>> >>> >    * @param connectionProperties JDBC database connection
>> >>> > arguments,
>> a
>> >>> > list
>> >>> > of arbitrary string
>> >>> >    *                             tag/value. Normally at least a
>> "user"
>> >>> > and
>> >>> > "password" property
>> >>> >    *                             should be included.
>> >>> >    *
>> >>> >    * @since 1.4.0
>> >>> >    */
>> >>> >
>> >>> >
>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rtref...@gmail.com>
>> >>> > wrote:
>> >>> >
>> >>> >> Hi,
>> >>> >>
>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is
>> an
>> >>> >> Array[String] of 32 to 48 elements).  (The code is tailored to
>> >>> >> your
>> >>> >> db,
>> >>> >> specifically through the where conditions, I'd have otherwise post
>> >>> >> it)
>> >>> >> That should be the DataFrame API, but I'm just trying to load
>> >>> >> everything
>> >>> >> and discard it as soon as possible :-)
>> >>> >>
>> >>> >> (1) Never do a silent drop of the values by default: it kills
>> >>> >> confidence.
>> >>> >> An option sounds reasonable.  Some sort of insight / log would be
>> >>> >> great.
>> >>> >> (How many columns of what type were truncated? why?)
>> >>> >> Note that I could declare the field as string via JdbcDialects
>> (thank
>> >>> >> you
>> >>> >> guys for merging that :-) ).
>> >>> >> I have quite bad experiences with silent drops / truncates of
>> columns
>> >>> >> and
>> >>> >> thus _like_ the strict way of spark. It causes trouble but
>> >>> >> noticing
>> >>> >> later
>> >>> >> that your data was corrupted during conversion is even worse.
>> >>> >>
>> >>> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
>> >>> >>
>> >>> >> (3) One option would be to make it safe to use, the other option
>> >>> >> would
>> >>> >> be
>> >>> >> to document the behavior (s.th. like "WARNING: this method tries
>> >>> >> to
>> >>> >> load
>> >>> >> as many partitions as possible, make sure your database can handle
>> >>> >> the
>> >>> >> load
>> >>> >> or load them in chunks and use union"). SPARK-8008
>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008
>> >>> >>
>> >>> >> Regards,
>> >>> >>   Rene Treffer
>> >>> >>
>> >>> >
>> >>>
>> >>>
>> >>> --
>> >>> Thanks,
>> >>> Mike
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: dev-h...@spark.apache.org
>> >>>
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to