Re: parquet predicate / projection pushdown into unionAll

2014-09-10 Thread Cody Koeninger
So the obvious thing I was missing is that the analyzer has already
resolved attributes by the time the optimizer runs, so the references in
the filter / projection need to be fixed up to match the children.

Created a PR, let me know if there's a better way to do it.  I'll see about
testing performance against some actual data sets.

On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger  wrote:

> Ok, so looking at the optimizer code for the first time and trying the
> simplest rule that could possibly work,
>
> object UnionPushdown extends Rule[LogicalPlan] {
>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> // Push down filter into
> union
> case f @ Filter(condition, u @ Union(left, right)) =>
>
>   u.copy(left = f.copy(child = left), right = f.copy(child =
> right))
>
>
> // Push down projection into
> union
> case p @ Project(projectList, u @ Union(left, right)) =>
>   u.copy(left = p.copy(child = left), right = p.copy(child =
> right))
>
> }
>
> }
>
>
> If I try manually applying that rule to a logical plan in the repl, it
> produces the query shape I'd expect, and executing that plan results in
> parquet pushdowns as I'd expect.
>
> But adding those cases to ColumnPruning results in a runtime exception
> (below)
>
> I can keep digging, but it seems like I'm missing some obvious initial
> context around naming of attributes.  If you can provide any pointers to
> speed me on my way I'd appreciate it.
>
>
> java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
> != WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
> at scala.Predef$.assert(Predef.scala:179)
> at
> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
> at
> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>
>
>
>
> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust 
> wrote:
>
>> What Patrick said is correct.  Two other points:
>>  - In the 1.2 release we are hoping to beef up the support for working
>> with partitioned parquet independent of the metastore.
>>  - You can actually do operations like INSERT INTO for parquet tables to
>> add data.  This creates new parquet files for each insertion.  This will
>> break if there are multiple concurrent writers to the same table.
>>
>> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell 
>> wrote:
>>
>>> I think what Michael means is people often use this to read existing
>>> partitione

Re: parquet predicate / projection pushdown into unionAll

2014-09-10 Thread Cody Koeninger
Tested the patch against a cluster with some real data.  Initial results
seem like going from one table to a union of 2 tables is now closer to a
doubling of query time as expected, instead of 5 to 10x.

Let me know if you see any issues with that PR.

On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger  wrote:

> So the obvious thing I was missing is that the analyzer has already
> resolved attributes by the time the optimizer runs, so the references in
> the filter / projection need to be fixed up to match the children.
>
> Created a PR, let me know if there's a better way to do it.  I'll see
> about testing performance against some actual data sets.
>
> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger  wrote:
>
>> Ok, so looking at the optimizer code for the first time and trying the
>> simplest rule that could possibly work,
>>
>> object UnionPushdown extends Rule[LogicalPlan] {
>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>> // Push down filter into
>> union
>> case f @ Filter(condition, u @ Union(left, right)) =>
>>
>>   u.copy(left = f.copy(child = left), right = f.copy(child =
>> right))
>>
>>
>> // Push down projection into
>> union
>> case p @ Project(projectList, u @ Union(left, right)) =>
>>   u.copy(left = p.copy(child = left), right = p.copy(child =
>> right))
>>
>> }
>>
>> }
>>
>>
>> If I try manually applying that rule to a logical plan in the repl, it
>> produces the query shape I'd expect, and executing that plan results in
>> parquet pushdowns as I'd expect.
>>
>> But adding those cases to ColumnPruning results in a runtime exception
>> (below)
>>
>> I can keep digging, but it seems like I'm missing some obvious initial
>> context around naming of attributes.  If you can provide any pointers to
>> speed me on my way I'd appreciate it.
>>
>>
>> java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
>> != WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
>> at scala.Predef$.assert(Predef.scala:179)
>> at
>> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>> at
>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>>
>>
>>
>>
>> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust 
>> wrote:
>>
>>> What Patrick said is correct.  Two other points:
>>>  - In the 1.2 release we are hoping to beef up the support for working
>>> with par

Re: parquet predicate / projection pushdown into unionAll

2014-09-10 Thread Michael Armbrust
Hey Cody,

Thanks for doing this!  Will look at your PR later today.

Michael

On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger  wrote:

> Tested the patch against a cluster with some real data.  Initial results
> seem like going from one table to a union of 2 tables is now closer to a
> doubling of query time as expected, instead of 5 to 10x.
>
> Let me know if you see any issues with that PR.
>
> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger 
> wrote:
>
>> So the obvious thing I was missing is that the analyzer has already
>> resolved attributes by the time the optimizer runs, so the references in
>> the filter / projection need to be fixed up to match the children.
>>
>> Created a PR, let me know if there's a better way to do it.  I'll see
>> about testing performance against some actual data sets.
>>
>> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger 
>> wrote:
>>
>>> Ok, so looking at the optimizer code for the first time and trying the
>>> simplest rule that could possibly work,
>>>
>>> object UnionPushdown extends Rule[LogicalPlan] {
>>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>>> // Push down filter into
>>> union
>>> case f @ Filter(condition, u @ Union(left, right)) =>
>>>
>>>   u.copy(left = f.copy(child = left), right = f.copy(child =
>>> right))
>>>
>>>
>>> // Push down projection into
>>> union
>>> case p @ Project(projectList, u @ Union(left, right)) =>
>>>   u.copy(left = p.copy(child = left), right = p.copy(child =
>>> right))
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> If I try manually applying that rule to a logical plan in the repl, it
>>> produces the query shape I'd expect, and executing that plan results in
>>> parquet pushdowns as I'd expect.
>>>
>>> But adding those cases to ColumnPruning results in a runtime exception
>>> (below)
>>>
>>> I can keep digging, but it seems like I'm missing some obvious initial
>>> context around naming of attributes.  If you can provide any pointers to
>>> speed me on my way I'd appreciate it.
>>>
>>>
>>> java.lang.AssertionError: assertion failed: ArrayBuffer() +
>>> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
>>> phones#11)
>>> at scala.Predef$.assert(Predef.scala:179)
>>> at
>>> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>> at
>>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at
>>> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>>> at
>>> org.apache.spark.sql.SQLContext$Quer

Re: Lost executor on YARN ALS iterations

2014-09-10 Thread Sandy Ryza
That's right

On Tue, Sep 9, 2014 at 2:04 PM, Debasish Das 
wrote:

> Last time it did not show up on environment tab but I will give it another
> shot...Expected behavior is that this env variable will show up right ?
>
> On Tue, Sep 9, 2014 at 12:15 PM, Sandy Ryza 
> wrote:
>
>> I would expect 2 GB would be enough or more than enough for 16 GB
>> executors (unless ALS is using a bunch of off-heap memory?).  You mentioned
>> earlier in this thread that the property wasn't showing up in the
>> Environment tab.  Are you sure it's making it in?
>>
>> -Sandy
>>
>> On Tue, Sep 9, 2014 at 11:58 AM, Debasish Das 
>> wrote:
>>
>>> Hmm...I did try it increase to few gb but did not get a successful run
>>> yet...
>>>
>>> Any idea if I am using say 40 executors, each running 16GB, what's the
>>> typical spark.yarn.executor.memoryOverhead for say 100M x 10 M large
>>> matrices with say few billion ratings...
>>>
>>> On Tue, Sep 9, 2014 at 10:49 AM, Sandy Ryza 
>>> wrote:
>>>
 Hi Deb,

 The current state of the art is to increase
 spark.yarn.executor.memoryOverhead until the job stops failing.  We do have
 plans to try to automatically scale this based on the amount of memory
 requested, but it will still just be a heuristic.

 -Sandy

 On Tue, Sep 9, 2014 at 7:32 AM, Debasish Das 
 wrote:

> Hi Sandy,
>
> Any resolution for YARN failures ? It's a blocker for running spark on
> top of YARN.
>
> Thanks.
> Deb
>
> On Tue, Aug 19, 2014 at 11:29 PM, Xiangrui Meng 
> wrote:
>
>> Hi Deb,
>>
>> I think this may be the same issue as described in
>> https://issues.apache.org/jira/browse/SPARK-2121 . We know that the
>> container got killed by YARN because it used much more memory that it
>> requested. But we haven't figured out the root cause yet.
>>
>> +Sandy
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Aug 19, 2014 at 8:51 PM, Debasish Das <
>> debasish.da...@gmail.com> wrote:
>> > Hi,
>> >
>> > During the 4th ALS iteration, I am noticing that one of the
>> executor gets
>> > disconnected:
>> >
>> > 14/08/19 23:40:00 ERROR network.ConnectionManager: Corresponding
>> > SendingConnectionManagerId not found
>> >
>> > 14/08/19 23:40:00 INFO cluster.YarnClientSchedulerBackend: Executor
>> 5
>> > disconnected, so removing it
>> >
>> > 14/08/19 23:40:00 ERROR cluster.YarnClientClusterScheduler: Lost
>> executor 5
>> > on tblpmidn42adv-hdp.tdc.vzwcorp.com: remote Akka client
>> disassociated
>> >
>> > 14/08/19 23:40:00 INFO scheduler.DAGScheduler: Executor lost: 5
>> (epoch 12)
>> > Any idea if this is a bug related to akka on YARN ?
>> >
>> > I am using master
>> >
>> > Thanks.
>> > Deb
>>
>
>

>>>
>>
>


Re: yet another jenkins restart early thursday morning -- 730am PDT (and a brief update on our new jenkins infra)

2014-09-10 Thread Nicholas Chammas
I'm looking forward to this. :)

Looks like Jenkins is having trouble triggering builds for new commits or
after user requests (e.g.
).
Hopefully that will be resolved tomorrow.

Nick

On Tue, Sep 9, 2014 at 5:00 PM, shane knapp  wrote:

> since the power incident last thursday, the github pull request builder
> plugin is still not really working 100%.  i found an open issue
> w/jenkins[1] that could definitely be affecting us, i will be pausing
> builds early thursday morning and then restarting jenkins.
> i'll send out a reminder tomorrow, and if this causes any problems for you,
> please let me know and we can work out a better time.
>
> but, now for some good news!  yesterday morning, we racked and stacked the
> systems for the new jenkins instance in the berkeley datacenter.  tomorrow
> i should be able to log in to them and start getting them set up and
> configured.  this is a major step in getting us in to a much more
> 'production' style environment!
>
> anyways:  thanks for your patience, and i think we've all learned that hard
> powering down your build system is a definite recipe for disaster.  :)
>
> shane
>
> [1] -- https://issues.jenkins-ci.org/browse/JENKINS-22509
>


Re: yet another jenkins restart early thursday morning -- 730am PDT (and a brief update on our new jenkins infra)

2014-09-10 Thread shane knapp
that's kinda what we're hoping as well.  :)

On Wed, Sep 10, 2014 at 2:46 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> I'm looking forward to this. :)
>
> Looks like Jenkins is having trouble triggering builds for new commits or
> after user requests (e.g.
> ).
> Hopefully that will be resolved tomorrow.
>
> Nick
>
> On Tue, Sep 9, 2014 at 5:00 PM, shane knapp  wrote:
>
>> since the power incident last thursday, the github pull request builder
>> plugin is still not really working 100%.  i found an open issue
>> w/jenkins[1] that could definitely be affecting us, i will be pausing
>> builds early thursday morning and then restarting jenkins.
>> i'll send out a reminder tomorrow, and if this causes any problems for
>> you,
>> please let me know and we can work out a better time.
>>
>> but, now for some good news!  yesterday morning, we racked and stacked the
>> systems for the new jenkins instance in the berkeley datacenter.  tomorrow
>> i should be able to log in to them and start getting them set up and
>> configured.  this is a major step in getting us in to a much more
>> 'production' style environment!
>>
>> anyways:  thanks for your patience, and i think we've all learned that
>> hard
>> powering down your build system is a definite recipe for disaster.  :)
>>
>> shane
>>
>> [1] -- https://issues.jenkins-ci.org/browse/JENKINS-22509
>>
>
>


Re: [RESULT] [VOTE] Release Apache Spark 1.1.0 (RC4)

2014-09-10 Thread Patrick Wendell
Hey just a heads up to everyone - running a bit behind on getting the
final artifacts and notes up. Finalizing this release was much more
complicated than previous ones due to new binary formats (we need to
redesign the download page a bit for this to work) and the large
increase in contributor count. Next time we can pipeline this work to
avoid a delay.

I did cut the v1.1.0 tag today. We should be able to do the full
announce tomorrow.

Thanks,
Patrick

On Sun, Sep 7, 2014 at 5:50 PM, Patrick Wendell  wrote:
> This vote passes with 8 binding +1 votes and no -1 votes. I'll post
> the final release in the next 48 hours... just finishing the release
> notes and packaging (which now takes a long time given the number of
> contributors!).
>
> +1:
> Reynold Xin*
> Michael Armbrust*
> Xiangrui Meng*
> Andrew Or*
> Sean Owen
> Matthew Farrellee
> Marcelo Vanzin
> Josh Rosen*
> Cheng Lian
> Mubarak Seyed
> Matei Zaharia*
> Nan Zhu
> Jeremy Freeman
> Denny Lee
> Tom Graves*
> Henry Saputra
> Egor Pahomov
> Rohit Sinha
> Kan Zhang
> Tathagata Das*
> Reza Zadeh
>
> -1:
>
> 0:
>
> * = binding

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org