The 2 plans look similar, but they are big difference, if you also consider that your source is in fact from a no-sql DB, like C*.
The OR plan has "Filter ((id#0L = 94) || (id#0L = 2))", which means the filter is indeed happening on Spark side, instead of on C* side. Which means to fulfill your query, Spark has to load all the data back C* (Image your have millions of IDs), and filter most of them out, and only keep data with id 94 and 2. The IO is bottleneck in this case, and huge data need to transfer from C* to spark. In the other case, the ids being pushed down to C* (and in most case, the id is the primary key (or at least partition key)), so C* will find the data for these 2 ids very fast, and only return the matching data back to Spark, then doing the aggregation based on very small data in Spark. That is why your performance is big difference in these 2 cases. You can argue that Spark-Cassandra connector should be smarter to handle the "OR" case. But in general, OR is not easy to handle, as in most cases, "OR" will be applied on different columns, instead of only on IDs in this case. If your query will use partition keys in C*, always use them with either "=" or "in". If not, then you have to wait for the data transfer from C* to spark. Spark + C* allow to run any ad-hoc queries, but you need to know the underline price paid. Yong ________________________________ From: Takeshi Yamamuro <[email protected]> Sent: Thursday, August 4, 2016 8:18 AM To: Marco Colombo Cc: user Subject: Re: Spark SQL and number of task Seems the performance difference comes from `CassandraSourceRelation`. I'm not familiar with the implementation though, I guess the filter `IN` is pushed down into the datasource and the other not. You'd better off checking performance metrics in webUI. // maropu On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo <[email protected]<mailto:[email protected]>> wrote: Ok, thanx. The 2 plan are very similar with in condition +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == | | TungstenAggregate(key=[id#0L], functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#81]) | | +- TungstenExchange hashpartitioning(id#0L,10), None | | +- TungstenAggregate(key=[id#0L], functions=[(avg(avg#2),mode=Partial,isDistinct=false)], output=[id#0L,sum#85,count#86L]) | | +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)] | +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ with the or condition +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == | | TungstenAggregate(key=[id#0L], functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#88]) | | +- TungstenExchange hashpartitioning(id#0L,10), None | | +- TungstenAggregate(key=[id#0L], functions=[(avg(avg#2),mode=Partial,isDistinct=false)], output=[id#0L,sum#92,count#93L]) | | +- Filter ((id#0L = 94) || (id#0L = 2)) | | +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))] | +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ Filters are pushed down, so I cannot realize why it is performing a so big data extraction in case of or. It's like a full table scan. Any advice? Thanks! 2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro <[email protected]<mailto:[email protected]>>: Hi, Please type `sqlCtx.sql("select * .... ").explain` to show execution plans. Also, you can kill jobs from webUI. // maropu On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo <[email protected]<mailto:[email protected]>> wrote: Hi all, I've a question on how hive+spark are handling data. I've started a new HiveContext and I'm extracting data from cassandra. I've configured spark.sql.shuffle.partitions=10. Now, I've following query: select d.id<http://d.id>, avg(d.avg) from v_points d where id=90 group by id; I see that 10 task are submitted and execution is fast. Every id on that table has 2000 samples. But if I just add a new id, as: select d.id<http://d.id>, avg(d.avg) from v_points d where id=90 or id=2 group by id; it adds 663 task and query does not end. If I write query with in () like select d.id<http://d.id>, avg(d.avg) from v_points d where id in (90,2) group by id; query is again fast. How can I get the 'execution plan' of the query? And also, how can I kill the long running submitted tasks? Thanks all! -- --- Takeshi Yamamuro -- Ing. Marco Colombo -- --- Takeshi Yamamuro
