2.2.0 under Unreleased Versions in JIRA?

2017-07-16 Thread Jacek Laskowski
Hi,

Just noticed that 2.2.0 label is under Unreleased Versions in JIRA.
Since it's out, I think 2.2.1 and 2.3.0 are valid only. Correct?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: 2.2.0 under Unreleased Versions in JIRA?

2017-07-16 Thread Sean Owen
Done, it just needed to be marked as released.

On Sun, Jul 16, 2017 at 12:03 PM Jacek Laskowski  wrote:

> Hi,
>
> Just noticed that 2.2.0 label is under Unreleased Versions in JIRA.
> Since it's out, I think 2.2.1 and 2.3.0 are valid only. Correct?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: 2.2.0 under Unreleased Versions in JIRA?

2017-07-16 Thread Jacek Laskowski
Confirmed. Thanks a lot, Sean.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 16, 2017 at 3:02 PM, Sean Owen  wrote:
> Done, it just needed to be marked as released.
>
> On Sun, Jul 16, 2017 at 12:03 PM Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> Just noticed that 2.2.0 label is under Unreleased Versions in JIRA.
>> Since it's out, I think 2.2.1 and 2.3.0 are valid only. Correct?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-16 Thread Chang Chen
It is tedious since we have lots of Hive SQL being migrated to Spark.  And
this workaround is equivalent  to insert a Project between Join operator
and its child.

Why not do it in PullOutNondeterministic?

Thanks
Chang

On Fri, Jul 14, 2017 at 5:29 PM, Liang-Chi Hsieh  wrote:

>
> A possible workaround is to add the rand column into tbl1 with a projection
> before the join.
>
> SELECT a.col1
> FROM (
>   SELECT col1,
> CASE
>  WHEN col2 IS NULL
>THEN cast(rand(9)*1000 - 99 as string)
>  ELSE
>col2
> END AS col2
> FROM tbl1) a
> LEFT OUTER JOIN tbl2 b
> ON a.col2 = b.col3;
>
>
>
> Chang Chen wrote
> > Hi Wenchen
> >
> > Yes. We also find this error is caused by Rand. However, this is classic
> > way to solve data skew in Hive.  Is there any equivalent way in Spark?
> >
> > Thanks
> > Chang
> >
> > On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan <
>
> > cloud0fan@
>
> > > wrote:
> >
> >> It’s not about case when, but about rand(). Non-deterministic
> expressions
> >> are not allowed in join condition.
> >>
> >> > On 13 Jul 2017, at 6:43 PM, wangshuang <
>
> > cn_wss@
>
> > > wrote:
> >> >
> >> > I'm trying to execute hive sql on spark sql (Also on spark
> >> thriftserver), For
> >> > optimizing data skew, we use "case when" to handle null.
> >> > Simple sql as following:
> >> >
> >> >
> >> > SELECT a.col1
> >> > FROM tbl1 a
> >> > LEFT OUTER JOIN tbl2 b
> >> > ON
> >> > * CASE
> >> >   WHEN a.col2 IS NULL
> >> >   TNEN cast(rand(9)*1000 - 99 as string)
> >> >   ELSE
> >> >   a.col2 END *
> >> >   = b.col3;
> >> >
> >> >
> >> > But I get the error:
> >> >
> >> > == Physical Plan ==
> >> > *org.apache.spark.sql.AnalysisException: nondeterministic expressions
> >> are
> >> > only allowed in
> >> > Project, Filter, Aggregate or Window, found:*
> >> > (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) * CAST(1000 AS
> >> > DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE a.`nav_tcdt`
> >> END
> >> =
> >> > c.`site_categ_id`) AND (CAST(a.`nav_tcd` AS INT) = 9)) AND
> >> (c.`cur_flag`
> >> =
> >> > 1))
> >> > in operator Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
> >> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double))
> as
> >> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26
> as
> >> int)
> >> > = 9)) && (cur_flag#77 = 1))
> >> >   ;;
> >> > GlobalLimit 10
> >> > +- LocalLimit 10
> >> >   +- Aggregate [date_id#7, CASE WHEN (cast(city_id#10 as string) IN
> >> > (cast(19596 as string),cast(20134 as string),cast(10997 as string)) &&
> >> > nav_tcdt#25 RLIKE ^[0-9]+$) THEN city_id#10 ELSE nav_tpa_id#21 END],
> >> > [date_id#7]
> >> >  +- Filter (date_id#7 = 2017-07-12)
> >> > +- Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
> >> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double))
> as
> >> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26
> as
> >> int)
> >> > = 9)) && (cur_flag#77 = 1))
> >> >:- SubqueryAlias a
> >> >:  +- SubqueryAlias tmp_lifan_trfc_tpa_hive
> >> >: +- CatalogRelation `tmp`.`tmp_lifan_trfc_tpa_hive`,
> >> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [date_id#7,
> >> chanl_id#8L,
> >> > pltfm_id#9, city_id#10, sessn_id#11, gu_id#12,
> >> nav_refer_page_type_id#13,
> >> > nav_refer_page_value#14, nav_refer_tpa#15, nav_refer_tpa_id#16,
> >> > nav_refer_tpc#17, nav_refer_tpi#18, nav_page_type_id#19,
> >> nav_page_value#20,
> >> > nav_tpa_id#21, nav_tpa#22, nav_tpc#23, nav_tpi#24, nav_tcdt#25,
> >> nav_tcd#26,
> >> > nav_tci#27, nav_tce#28, detl_refer_page_type_id#29,
> >> > detl_refer_page_value#30, ... 33 more fields]
> >> >+- SubqueryAlias c
> >> >   +- SubqueryAlias dim_site_categ_ext
> >> >  +- CatalogRelation `dw`.`dim_site_categ_ext`,
> >> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
> >> [site_categ_skid#64L,
> >> > site_categ_type#65, site_categ_code#66, site_categ_name#67,
> >> > site_categ_parnt_skid#68L, site_categ_kywrd#69, leaf_flg#70L,
> >> sort_seq#71L,
> >> > site_categ_srch_name#72, vsbl_flg#73, delet_flag#74, etl_batch_id#75L,
> >> > updt_time#76, cur_flag#77, bkgrnd_categ_skid#78L, bkgrnd_categ_id#79L,
> >> > site_categ_id#80, site_categ_parnt_id#81]
> >> >
> >> > Does spark sql not support syntax "case when" in JOIN?  Additional, my
> >> spark
> >> > version is 2.2.0.
> >> > Any help would be greatly appreciated.
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context: http://apache-spark-developers
> >> -list.1001551.n3.nabble.com/SQL-Syntax-case-when-doesn-t-
> >> be-supported-in-JOIN-tp21953.html
> >> > Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mai

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-16 Thread Xiao Li
If the join condition is non-deterministic, pushing it down to the
underlying project will change the semantics. Thus, we are unable to do it
in PullOutNondeterministic. Users can do it manually if they do not care
the semantics difference.

Thanks,

Xiao



2017-07-16 20:07 GMT-07:00 Chang Chen :

> It is tedious since we have lots of Hive SQL being migrated to Spark.  And
> this workaround is equivalent  to insert a Project between Join operator
> and its child.
>
> Why not do it in PullOutNondeterministic?
>
> Thanks
> Chang
>
>
> On Fri, Jul 14, 2017 at 5:29 PM, Liang-Chi Hsieh  wrote:
>
>>
>> A possible workaround is to add the rand column into tbl1 with a
>> projection
>> before the join.
>>
>> SELECT a.col1
>> FROM (
>>   SELECT col1,
>> CASE
>>  WHEN col2 IS NULL
>>THEN cast(rand(9)*1000 - 99 as string)
>>  ELSE
>>col2
>> END AS col2
>> FROM tbl1) a
>> LEFT OUTER JOIN tbl2 b
>> ON a.col2 = b.col3;
>>
>>
>>
>> Chang Chen wrote
>> > Hi Wenchen
>> >
>> > Yes. We also find this error is caused by Rand. However, this is classic
>> > way to solve data skew in Hive.  Is there any equivalent way in Spark?
>> >
>> > Thanks
>> > Chang
>> >
>> > On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan <
>>
>> > cloud0fan@
>>
>> > > wrote:
>> >
>> >> It’s not about case when, but about rand(). Non-deterministic
>> expressions
>> >> are not allowed in join condition.
>> >>
>> >> > On 13 Jul 2017, at 6:43 PM, wangshuang <
>>
>> > cn_wss@
>>
>> > > wrote:
>> >> >
>> >> > I'm trying to execute hive sql on spark sql (Also on spark
>> >> thriftserver), For
>> >> > optimizing data skew, we use "case when" to handle null.
>> >> > Simple sql as following:
>> >> >
>> >> >
>> >> > SELECT a.col1
>> >> > FROM tbl1 a
>> >> > LEFT OUTER JOIN tbl2 b
>> >> > ON
>> >> > * CASE
>> >> >   WHEN a.col2 IS NULL
>> >> >   TNEN cast(rand(9)*1000 - 99 as string)
>> >> >   ELSE
>> >> >   a.col2 END *
>> >> >   = b.col3;
>> >> >
>> >> >
>> >> > But I get the error:
>> >> >
>> >> > == Physical Plan ==
>> >> > *org.apache.spark.sql.AnalysisException: nondeterministic
>> expressions
>> >> are
>> >> > only allowed in
>> >> > Project, Filter, Aggregate or Window, found:*
>> >> > (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) * CAST(1000
>> AS
>> >> > DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE a.`nav_tcdt`
>> >> END
>> >> =
>> >> > c.`site_categ_id`) AND (CAST(a.`nav_tcd` AS INT) = 9)) AND
>> >> (c.`cur_flag`
>> >> =
>> >> > 1))
>> >> > in operator Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
>> >> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double))
>> as
>> >> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26
>> as
>> >> int)
>> >> > = 9)) && (cur_flag#77 = 1))
>> >> >   ;;
>> >> > GlobalLimit 10
>> >> > +- LocalLimit 10
>> >> >   +- Aggregate [date_id#7, CASE WHEN (cast(city_id#10 as string) IN
>> >> > (cast(19596 as string),cast(20134 as string),cast(10997 as string))
>> &&
>> >> > nav_tcdt#25 RLIKE ^[0-9]+$) THEN city_id#10 ELSE nav_tpa_id#21 END],
>> >> > [date_id#7]
>> >> >  +- Filter (date_id#7 = 2017-07-12)
>> >> > +- Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
>> >> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double))
>> as
>> >> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26
>> as
>> >> int)
>> >> > = 9)) && (cur_flag#77 = 1))
>> >> >:- SubqueryAlias a
>> >> >:  +- SubqueryAlias tmp_lifan_trfc_tpa_hive
>> >> >: +- CatalogRelation `tmp`.`tmp_lifan_trfc_tpa_hive`,
>> >> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [date_id#7,
>> >> chanl_id#8L,
>> >> > pltfm_id#9, city_id#10, sessn_id#11, gu_id#12,
>> >> nav_refer_page_type_id#13,
>> >> > nav_refer_page_value#14, nav_refer_tpa#15, nav_refer_tpa_id#16,
>> >> > nav_refer_tpc#17, nav_refer_tpi#18, nav_page_type_id#19,
>> >> nav_page_value#20,
>> >> > nav_tpa_id#21, nav_tpa#22, nav_tpc#23, nav_tpi#24, nav_tcdt#25,
>> >> nav_tcd#26,
>> >> > nav_tci#27, nav_tce#28, detl_refer_page_type_id#29,
>> >> > detl_refer_page_value#30, ... 33 more fields]
>> >> >+- SubqueryAlias c
>> >> >   +- SubqueryAlias dim_site_categ_ext
>> >> >  +- CatalogRelation `dw`.`dim_site_categ_ext`,
>> >> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
>> >> [site_categ_skid#64L,
>> >> > site_categ_type#65, site_categ_code#66, site_categ_name#67,
>> >> > site_categ_parnt_skid#68L, site_categ_kywrd#69, leaf_flg#70L,
>> >> sort_seq#71L,
>> >> > site_categ_srch_name#72, vsbl_flg#73, delet_flag#74,
>> etl_batch_id#75L,
>> >> > updt_time#76, cur_flag#77, bkgrnd_categ_skid#78L,
>> bkgrnd_categ_id#79L,
>> >> > site_categ_id#80, site_categ_parnt_id#81]
>> >> >
>> >> > Does spark sql not support syntax "case when" in JOIN?  Additional,
>> my
>> >> spark
>> >> > versi

Re: How to tune the performance of Tpch query5 within Spark

2017-07-16 Thread 163
I change the UDF but the performance seems still slow. What can I do else?


> 在 2017年7月14日,下午8:34,Wenchen Fan  写道:
> 
> Try to replace your UDF with Spark built-in expressions, it should be as 
> simple as `$”x” * (lit(1) - $”y”)`.
> 
>> On 14 Jul 2017, at 5:46 PM, 163 > > wrote:
>> 
>> I modify the tech query5 to DataFrame:
>> val forders = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>>  
>> ”).filter("o_orderdate
>>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
>> "o_orderkey")
>> val flineitem = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>>  ")
>> val fcustomer = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>>  ")
>> val fsupplier = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>>  ")
>> val fregion = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>>  
>> ”).where("r_name
>>  = 'ASIA'").select($"r_regionkey")
>> val fnation = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>>  ”)
>> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
>> $"c_nationkey" === fsupplier("s_nationkey"))
>>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>>  .select($"n_name", decrease($"l_extendedprice", 
>> $"l_discount").as("value"))
>>  .groupBy($"n_name")
>>  .agg(sum($"value").as("revenue"))
>>  .sort($"revenue".desc).show()
>> 
>> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
>> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
>> format.
>> It executed about 1.5m, I found that read these 6 tables using 
>> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>>  I’ve already set data locality and spark.default.parallelism, 
>> spark.serializer, using G1, But the runtime  is still not reduced. 
>> And is there any advices for me to tuning this performance?
>> Thank you.
>> 
>> Wenting He
>> 
>