Hi folks,

When I look at the output logs for an iterative Spark program, I see
that the stage IDs are not arithmetically numbered---that is, there
are gaps between stages and I might find log information about Stage
0, 1,2, 5, but not 3 or 4.

As an example, the output from the Spark logs below shows what I mean:

# grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
12048:INFO:DAGScheduler:Stage 0 (mapPartitions at blockMap.scala:1444)
finished in 7.820 s:
15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished
in 3.874 s:
18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
finished in 2.237 s:
20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished
in 1.749 s:
21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
finished in 1.082 s:
23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished
in 2.078 s:
24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
finished in 1.317 s:
26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) finished
in 1.638 s:
27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
finished in 0.732 s:
27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
finished in 0.192 s:
27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
finished in 0.170 s:
28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
finished in 0.270 s:
28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) finished
in 0.455 s:
29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
finished in 0.928 s:
29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) finished
in 0.305 s:
30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
finished in 0.391 s:
30452:INFO:DAGScheduler:Stage 32 (first at
MatrixFactorizationModel.scala:60) finished in 0.028 s:
30506:INFO:DAGScheduler:Stage 36 (first at
MatrixFactorizationModel.scala:60) finished in 0.023 s:

Can anyone comment on this being normal behavior? Is it indicative of
faults causing stages to be resubmitted? I also cannot find the
missing stages in any stage's parent List(Stage x, Stage y, ...)

Thanks,
Mike


On 6/1/15, Reynold Xin <r...@databricks.com> wrote:
> Thanks, René. I actually added a warning to the new JDBC reader/writer
> interface for 1.4.0.
>
> Even with that, I think we should support throttling JDBC; otherwise it's
> too convenient for our users to DOS their production database servers!
>
>
>   /**
>    * Construct a [[DataFrame]] representing the database table accessible
> via JDBC URL
>    * url named table. Partitions of the table will be retrieved in parallel
> based on the parameters
>    * passed to this function.
>    *
> *   * Don't create too many partitions in parallel on a large cluster;
> otherwise Spark might crash*
> *   * your external database systems.*
>    *
>    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
>    * @param table Name of the table in the external database.
>    * @param columnName the name of a column of integral type that will be
> used for partitioning.
>    * @param lowerBound the minimum value of `columnName` used to decide
> partition stride
>    * @param upperBound the maximum value of `columnName` used to decide
> partition stride
>    * @param numPartitions the number of partitions.  the range
> `minValue`-`maxValue` will be split
>    *                      evenly into this many partitions
>    * @param connectionProperties JDBC database connection arguments, a list
> of arbitrary string
>    *                             tag/value. Normally at least a "user" and
> "password" property
>    *                             should be included.
>    *
>    * @since 1.4.0
>    */
>
>
> On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rtref...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is an
>> Array[String] of 32 to 48 elements).  (The code is tailored to your db,
>> specifically through the where conditions, I'd have otherwise post it)
>> That should be the DataFrame API, but I'm just trying to load everything
>> and discard it as soon as possible :-)
>>
>> (1) Never do a silent drop of the values by default: it kills confidence.
>> An option sounds reasonable.  Some sort of insight / log would be great.
>> (How many columns of what type were truncated? why?)
>> Note that I could declare the field as string via JdbcDialects (thank you
>> guys for merging that :-) ).
>> I have quite bad experiences with silent drops / truncates of columns and
>> thus _like_ the strict way of spark. It causes trouble but noticing later
>> that your data was corrupted during conversion is even worse.
>>
>> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
>>
>> (3) One option would be to make it safe to use, the other option would be
>> to document the behavior (s.th. like "WARNING: this method tries to load
>> as many partitions as possible, make sure your database can handle the
>> load
>> or load them in chunks and use union"). SPARK-8008
>> https://issues.apache.org/jira/browse/SPARK-8008
>>
>> Regards,
>>   Rene Treffer
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to