Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running on EMR) If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join works, if I leave it as default it does not work. When it doesn’t work, then one of my joined columns is filled with very small Doubles.
I’m joining two small tables: (datetime,spx) and (datetime,vix) Attached are the plans and debug. ================================================================ The (Default) Broken case: +-------------+-----------+ | datetime| spx| +-------------+-----------+ |1476907200000|2144.290039| |1476820800000|2139.600098| |1476734400000| 2126.5| |1476475200000| 2132.97998| |1476388800000|2132.550049| |1476302400000|2139.179932| |1476216000000| 2136.72998| |1476129600000|2163.659912| |1475870400000| 2153.73999| |1475784000000| 2160.77002| |1475697600000| 2159.72998| |1475611200000| 2150.48999| |1475524800000|2161.199951| |1475265600000| 2168.27002| |1475179200000|2151.129883| |1475092800000|2171.370117| |1475006400000|2159.929932| |1474920000000|2146.100098| |1474660800000|2164.689941| |1474574400000|2177.179932| +-------------+-----------+ only showing top 20 rows +-------------+---------+ | datetime| vix| +-------------+---------+ |1476907200000| 14.41| |1476820800000| 15.28| |1476734400000|16.209999| |1476475200000|16.120001| |1476388800000|16.690001| |1476302400000| 15.91| |1476216000000| 15.36| |1476129600000| 13.38| |1475870400000| 13.48| |1475784000000| 12.84| |1475697600000| 12.99| |1475611200000| 13.63| |1475524800000| 13.57| |1475265600000| 13.29| |1475179200000| 14.02| |1475092800000| 12.39| |1475006400000| 13.1| |1474920000000| 14.5| |1474660800000| 12.29| |1474574400000| 12.02| +-------------+---------+ only showing top 20 rows 2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen: 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: user=0.29 sys=0.04, real=0.03 secs] == Physical Plan == *Project [datetime#34L, spx#25, vix#72] +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25] : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72] +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> ######## == Parsed Logical Plan == 'Join UsingJoin(Inner,List('datetime)) :- Project [datetime#34L, spx#25] : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6] : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6] : +- Filter NOT (Date#0 = Date) : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv +- Project [datetime#81L, vix#72] +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53] +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53] +- Filter NOT (Date#47 = Date) +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv == Analyzed Logical Plan == datetime: bigint, spx: double, vix: double Project [datetime#34L, spx#25, vix#72] +- Join Inner, (datetime#34L = datetime#81L) :- Project [datetime#34L, spx#25] : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6] : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6] : +- Filter NOT (Date#0 = Date) : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv +- Project [datetime#81L, vix#72] +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53] +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53] +- Filter NOT (Date#47 = Date) +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv == Optimized Logical Plan == Project [datetime#34L, spx#25, vix#72] +- Join Inner, (datetime#34L = datetime#81L) :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25] : +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72] +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv == Physical Plan == *Project [datetime#34L, spx#25, vix#72] +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25] : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72] +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> ######## +-------------+-----------+-------------------+ | datetime| spx| vix| +-------------+-----------+-------------------+ |1476907200000|2144.290039|7.296891096156E-312| |1476820800000|2139.600098| 7.29646422344E-312| |1476734400000| 2126.5| 7.29603735072E-312| |1476475200000| 2132.97998|7.294756732566E-312| |1476388800000|2132.550049| 7.29432985985E-312| |1476302400000|2139.179932| 7.29390298713E-312| |1476216000000| 2136.72998| 7.29347611441E-312| |1476129600000|2163.659912|7.293049241694E-312| |1475870400000| 2153.73999| 7.29176862354E-312| |1475784000000| 2160.77002| 7.29134175082E-312| |1475697600000| 2159.72998|7.290914878104E-312| |1475611200000| 2150.48999|7.290488005386E-312| |1475524800000|2161.199951| 7.29006113267E-312| |1475265600000| 2168.27002|7.288780514514E-312| |1475179200000|2151.129883|7.288353641796E-312| |1475092800000|2171.370117| 7.28792676908E-312| |1475006400000|2159.929932| 7.28749989636E-312| |1474920000000|2146.100098| 7.28707302364E-312| |1474660800000|2164.689941| 7.28579240549E-312| |1474574400000|2177.179932| 7.28536553277E-312| +-------------+-----------+-------------------+ ============================================================================= ============================================================================= ============================================================================= --conf,spark.sql.autoBroadcastJoinThreshold=-1, +-------------+-----------+ | datetime| spx| +-------------+-----------+ |1476907200000|2144.290039| |1476820800000|2139.600098| |1476734400000| 2126.5| |1476475200000| 2132.97998| |1476388800000|2132.550049| |1476302400000|2139.179932| |1476216000000| 2136.72998| |1476129600000|2163.659912| |1475870400000| 2153.73999| |1475784000000| 2160.77002| |1475697600000| 2159.72998| |1475611200000| 2150.48999| |1475524800000|2161.199951| |1475265600000| 2168.27002| |1475179200000|2151.129883| |1475092800000|2171.370117| |1475006400000|2159.929932| |1474920000000|2146.100098| |1474660800000|2164.689941| |1474574400000|2177.179932| +-------------+-----------+ only showing top 20 rows +-------------+---------+ | datetime| vix| +-------------+---------+ |1476907200000| 14.41| |1476820800000| 15.28| |1476734400000|16.209999| |1476475200000|16.120001| |1476388800000|16.690001| |1476302400000| 15.91| |1476216000000| 15.36| |1476129600000| 13.38| |1475870400000| 13.48| |1475784000000| 12.84| |1475697600000| 12.99| |1475611200000| 13.63| |1475524800000| 13.57| |1475265600000| 13.29| |1475179200000| 14.02| |1475092800000| 12.39| |1475006400000| 13.1| |1474920000000| 14.5| |1474660800000| 12.29| |1474574400000| 12.02| +-------------+---------+ only showing top 20 rows == Physical Plan == *Project [datetime#34L, spx#25, vix#72] +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner :- *Sort [datetime#34L ASC], false, 0 : +- Exchange hashpartitioning(datetime#34L, 200) : +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25] : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> +- *Sort [datetime#81L ASC], false, 0 +- Exchange hashpartitioning(datetime#81L, 200) +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72] +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> ######## 2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen: 705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times: user=0.26 sys=0.04, real=0.03 secs] == Parsed Logical Plan == 'Join UsingJoin(Inner,List('datetime)) :- Project [datetime#34L, spx#25] : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6] : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6] : +- Filter NOT (Date#0 = Date) : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv +- Project [datetime#81L, vix#72] +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53] +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53] +- Filter NOT (Date#47 = Date) +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv == Analyzed Logical Plan == datetime: bigint, spx: double, vix: double Project [datetime#34L, spx#25, vix#72] +- Join Inner, (datetime#34L = datetime#81L) :- Project [datetime#34L, spx#25] : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6] : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6] : +- Filter NOT (Date#0 = Date) : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv +- Project [datetime#81L, vix#72] +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53] +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53] +- Filter NOT (Date#47 = Date) +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv == Optimized Logical Plan == Project [datetime#34L, spx#25, vix#72] +- Join Inner, (datetime#34L = datetime#81L) :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25] : +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72] +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv == Physical Plan == *Project [datetime#34L, spx#25, vix#72] +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner :- *Sort [datetime#34L ASC], false, 0 : +- Exchange hashpartitioning(datetime#34L, 200) : +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25] : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> +- *Sort [datetime#81L ASC], false, 0 +- Exchange hashpartitioning(datetime#81L, 200) +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72] +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> ######## +-------------+-----------+---------+ | datetime| spx| vix| +-------------+-----------+---------+ | 931550400000|1403.280029|17.959999| | 955742400000|1356.560059|33.490002| | 962308800000|1442.390015|19.700001| | 967752000000|1517.680054| 16.84| | 995054400000|1215.680054|21.139999| |1028145600000| 911.619995|32.029999| |1049832000000| 878.289978|27.129999| |1088452800000|1133.349976| 16.07| |1097265600000|1122.140015| 15.05| |1102539600000|1182.810059| 13.19| |1147809600000|1292.079956| 13.35| |1162414800000|1367.810059| 11.51| |1266526800000| 1106.75|20.629999| |1314043200000|1123.819946|42.439999| |1319227200000| 1238.25| 31.32| |1331928000000|1404.170044| 14.43| |1377201600000|1656.959961| 14.76| |1378756800000|1671.709961| 15.63| |1390597200000|1790.290039|17.879999| |1400616000000|1872.829956| 12.96| +-------------+-----------+---------+
signature.asc
Description: Message signed with OpenPGP using GPGMail