Seems the performance difference comes from `CassandraSourceRelation`. I'm not familiar with the implementation though, I guess the filter `IN` is pushed down into the datasource and the other not.
You'd better off checking performance metrics in webUI. // maropu On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo <ing.marco.colo...@gmail.com> wrote: > Ok, thanx. > The 2 plan are very similar > > with in condition > > +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ > | > plan | > > +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ > | == Physical Plan == > | > | TungstenAggregate(key=[id#0L], > functions=[(avg(avg#2),mode=Final,isDistinct=false)], > output=[id#0L,_c1#81]) | > | +- TungstenExchange hashpartitioning(id#0L,10), None > | > | +- TungstenAggregate(key=[id#0L], > functions=[(avg(avg#2),mode=Partial,isDistinct=false)], > output=[id#0L,sum#85,count#86L]) | > | +- Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] > PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)] | > > +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ > > with the or condition > > +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ > | > plan | > > +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ > | == Physical Plan == > | > | TungstenAggregate(key=[id#0L], > functions=[(avg(avg#2),mode=Final,isDistinct=false)], > output=[id#0L,_c1#88]) | > | +- TungstenExchange hashpartitioning(id#0L,10), None > | > | +- TungstenAggregate(key=[id#0L], > functions=[(avg(avg#2),mode=Partial,isDistinct=false)], > output=[id#0L,sum#92,count#93L]) | > | +- Filter ((id#0L = 94) || (id#0L = 2)) > | > | +- Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] > PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))] | > > +--------------------------------------------------------------------------------------------------------------------------------------------------+--+ > > > Filters are pushed down, so I cannot realize why it is performing a so big > data extraction in case of or. It's like a full table scan. > > Any advice? > > Thanks! > > > 2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro <linguin....@gmail.com>: > >> Hi, >> >> Please type `sqlCtx.sql("select * .... ").explain` to show execution >> plans. >> Also, you can kill jobs from webUI. >> >> // maropu >> >> >> On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo < >> ing.marco.colo...@gmail.com> wrote: >> >>> Hi all, I've a question on how hive+spark are handling data. >>> >>> I've started a new HiveContext and I'm extracting data from cassandra. >>> I've configured spark.sql.shuffle.partitions=10. >>> Now, I've following query: >>> >>> select d.id, avg(d.avg) from v_points d where id=90 group by id; >>> >>> I see that 10 task are submitted and execution is fast. Every id on that >>> table has 2000 samples. >>> >>> But if I just add a new id, as: >>> >>> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id; >>> >>> it adds 663 task and query does not end. >>> >>> If I write query with in () like >>> >>> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id; >>> >>> query is again fast. >>> >>> How can I get the 'execution plan' of the query? >>> >>> And also, how can I kill the long running submitted tasks? >>> >>> Thanks all! >>> >> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > > > -- > Ing. Marco Colombo > -- --- Takeshi Yamamuro