Re: Apache Spark 3.2.2 Release?
+1 (non-binding) Thanks! On Thu, Jul 7, 2022 at 7:00 AM Yang,Jie(INF) wrote: > +1 (non-binding) Thank you Dongjoon ~ > > > > *发件人**: *Ruifeng Zheng > *日期**: *2022年7月7日 星期四 16:28 > *收件人**: *dev > *主题**: *Re: Apache Spark 3.2.2 Release? > > > > +1 thank you Dongjoon! > > > -- > > [image: 图像已被发件人删除。] > > Ruifeng Zheng > > ruife...@foxmail.com > > > > > > > > -- Original -- > > *From:* "Yikun Jiang" ; > > *Date:* Thu, Jul 7, 2022 04:16 PM > > *To:* "Mridul Muralidharan"; > > *Cc:* "Gengliang Wang";"Cheng Su";"Maxim > Gekk";"Wenchen > Fan";"Xiao > Li";"Xinrong > Meng";"Yuming Wang" >;"dev"; > > *Subject:* Re: Apache Spark 3.2.2 Release? > > > > +1 (non-binding) > > > > Thanks! > > > Regards, > > Yikun > > > > > > On Thu, Jul 7, 2022 at 1:57 PM Mridul Muralidharan > wrote: > > +1 > > > > Thanks for driving this Dongjoon ! > > > > Regards, > > Mridul > > > > On Thu, Jul 7, 2022 at 12:36 AM Gengliang Wang wrote: > > +1. > > Thank you, Dongjoon. > > > > On Wed, Jul 6, 2022 at 10:21 PM Wenchen Fan wrote: > > +1 > > > > On Thu, Jul 7, 2022 at 10:41 AM Xinrong Meng > wrote: > > +1 > > > Thanks! > > > > Xinrong Meng > > Software Engineer > > Databricks > > > > > > On Wed, Jul 6, 2022 at 7:25 PM Xiao Li wrote: > > +1 > > > > Xiao > > > > Cheng Su 于2022年7月6日周三 19:16写道: > > +1 (non-binding) > > > > Thanks, > > Cheng Su > > > > On Wed, Jul 6, 2022 at 6:01 PM Yuming Wang wrote: > > +1 > > > > On Thu, Jul 7, 2022 at 5:53 AM Maxim Gekk > wrote: > > +1 > > > > On Thu, Jul 7, 2022 at 12:26 AM John Zhuge wrote: > > +1 Thanks for the effort! > > > > On Wed, Jul 6, 2022 at 2:23 PM Bjørn Jørgensen > wrote: > > +1 > > > > ons. 6. jul. 2022, 23:05 skrev Hyukjin Kwon : > > Yeah +1 > > > > On Thu, Jul 7, 2022 at 5:40 AM Dongjoon Hyun > wrote: > > Hi, All. > > Since Apache Spark 3.2.1 tag creation (Jan 19), new 197 patches > including 11 correctness patches arrived at branch-3.2. > > Shall we make a new release, Apache Spark 3.2.2, as the third release > at 3.2 line? I'd like to volunteer as the release manager for Apache > Spark 3.2.2. I'm thinking about starting the first RC next week. > > $ git log --oneline v3.2.1..HEAD | wc -l > 197 > > # Correctness issues > > SPARK-38075 Hive script transform with order by and limit will > return fake rows > SPARK-38204 All state operators are at a risk of inconsistency > between state partitioning and operator partitioning > SPARK-38309 SHS has incorrect percentiles for shuffle read bytes > and shuffle total blocks metrics > SPARK-38320 (flat)MapGroupsWithState can timeout groups which just > received inputs in the same microbatch > SPARK-38614 After Spark update, df.show() shows incorrect > F.percent_rank results > SPARK-38655 OffsetWindowFunctionFrameBase cannot find the offset > row whose input is not null > SPARK-38684 Stream-stream outer join has a possible correctness > issue due to weakly read consistent on outer iterators > SPARK-39061 Incorrect results or NPE when using Inline function > against an array of dynamically created structs > SPARK-39107 Silent change in regexp_replace's handling of empty strings > SPARK-39259 Timestamps returned by now() and equivalent functions > are not consistent in subqueries > SPARK-39293 The accumulator of ArrayAggregate should copy the > intermediate result if string, struct, array, or map > > Best, > Dongjoon. > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- > > John Zhuge > >
Re: Scala left join with multiple columns Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.
You probably don't want null safe equals (<=>) with a left join. On Mon, Apr 3, 2017 at 5:46 PM gjohnson35 wrote: > The join condition with && is throwing an exception: > > val df = baseDF.join(mccDF, mccDF("medical_claim_id") <=> > baseDF("medical_claim_id") > && mccDF("medical_claim_detail_id") <=> > baseDF("medical_claim_detail_id"), "left") > .join(revCdDF, revCdDF("revenue_code_padded_str") <=> > mccDF("mcc_code"), "left") > .select(baseDF("medical_claim_id"), > baseDF("medical_claim_detail_id"), > baseDF("revenue_code"), baseDF("rev_code_distinct_count"), > baseDF("rtos_1_1_count"), baseDF("rtos_1_0_count"), > baseDF("er_visit_flag"), baseDF("observation_stay_flag"), > revCdDF("rtos_2_code"), revCdDF("rtos_2_hierarchy")) > .where(revCdDF("rtos_2_code").between(8, 27).isNotNull) > .groupBy( > baseDF("medical_claim_id"), > baseDF("medical_claim_detail_id") > ) > .agg(min(revCdDF("rtos_2_code").alias("min_rtos_2_8_thru_27")), > min(revCdDF("rtos_2_hierarchy").alias("min_rtos_2_8_thru_27_hier"))) > > > This query runs fine: > > val df = baseDF.join(mccDF, mccDF("medical_claim_id") <=> > baseDF("medical_claim_id"), "left") > .join(mccDF, mccDF("medical_claim_detail_id") <=> > baseDF("medical_claim_detail_id"), "left") > .join(revCdDF, revCdDF("revenue_code_padded_str") <=> > mccDF("mcc_code"), "left") > .select(baseDF("medical_claim_id"), > baseDF("medical_claim_detail_id"), > baseDF("revenue_code"), baseDF("rev_code_distinct_count"), > baseDF("rtos_1_1_count"), baseDF("rtos_1_0_count"), > baseDF("er_visit_flag"), baseDF("observation_stay_flag"), > revCdDF("rtos_2_code"), revCdDF("rtos_2_hierarchy")) > .where(revCdDF("rtos_2_code").between(8, 27).isNotNull) > .groupBy( > baseDF("medical_claim_id"), > baseDF("medical_claim_detail_id") > ) > .agg(min(revCdDF("rtos_2_code").alias("min_rtos_2_8_thru_27")), > min(revCdDF("rtos_2_hierarchy").alias("min_rtos_2_8_thru_27_hier"))) > > If I remove the multiple Columns in the join and create a join statement > for > each one then the exception goes away. Is there a better way to join > multiple columns? > > > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/Scala-left-join-with-multiple-columns-Join-condition-is-missing-or-trivial-Use-the-CROSS-JOIN-syntax-tp21297.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: [discuss] ending support for Java 7 in Spark 2.0
+1 on removing Java 7 and Scala 2.10 support. It looks to be entirely possible to support Java 8 containers in a YARN cluster otherwise running Java 7 (example code for alt JAVA_HOME https://issues.apache.org/jira/secure/attachment/12671739/YARN-1964.patch) so really there should be no big problem. Even if that somehow doesn't work I'm still +1 as the benefits are so large. I'd also like to point out that it is completely trivial to have multiple versions of Spark running concurrently on a YARN cluster. At my previous (extremely large) employer we had almost every release since 1.0 installed, with the latest being default and production apps pinned to a specific version. So if you want to keep using some Scala 2.10 only library or just don't want to migrate to Java 8, feel free to continue using Spark 1.x for those applications. IMHO we need to move on from EOL stuff to make room for the future (Java 9, Scala 2.12) and Spark 2.0 is the only chance we are going to have to do so for a long time. --Andrew On Thu, Mar 24, 2016 at 10:55 PM, Mridul Muralidharan wrote: > > I do agree w.r.t scala 2.10 as well; similar arguments apply (though there > is a nuanced diff - source compatibility for scala vs binary compatibility > wrt Java) > Was there a proposal which did not go through ? Not sure if I missed it. > > Regards > Mridul > > > On Thursday, March 24, 2016, Koert Kuipers wrote: > >> i think that logic is reasonable, but then the same should also apply to >> scala 2.10, which is also unmaintained/unsupported at this point (basically >> has been since march 2015 except for one hotfix due to a license >> incompatibility) >> >> who wants to support scala 2.10 three years after they did the last >> maintenance release? >> >> >> On Thu, Mar 24, 2016 at 9:59 PM, Mridul Muralidharan >> wrote: >> >>> Removing compatibility (with jdk, etc) can be done with a major release- >>> given that 7 has been EOLed a while back and is now unsupported, we have to >>> decide if we drop support for it in 2.0 or 3.0 (2+ years from now). >>> >>> Given the functionality & performance benefits of going to jdk8, future >>> enhancements relevant in 2.x timeframe ( scala, dependencies) which >>> requires it, and simplicity wrt code, test & support it looks like a good >>> checkpoint to drop jdk7 support. >>> >>> As already mentioned in the thread, existing yarn clusters are >>> unaffected if they want to continue running jdk7 and yet use >>> spark2 (install jdk8 on all nodes and use it via JAVA_HOME, or worst case >>> distribute jdk8 as archive - suboptimal). >>> I am unsure about mesos (standalone might be easier upgrade I guess ?). >>> >>> >>> Proposal is for 1.6x line to continue to be supported with critical >>> fixes; newer features will require 2.x and so jdk8 >>> >>> Regards >>> Mridul >>> >>> >>> On Thursday, March 24, 2016, Marcelo Vanzin wrote: >>> On Thu, Mar 24, 2016 at 4:50 PM, Reynold Xin wrote: > If you want to go down that route, you should also ask somebody who has had > experience managing a large organization's applications and try to update > Scala version. I understand both sides. But if you look at what I've been asking since the beginning, it's all about the cost and benefits of dropping support for java 1.7. The biggest argument in your original e-mail is about testing. And the testing cost is much bigger for supporting scala 2.10 than it is for supporting java 1.7. If you read one of my earlier replies, it should be even possible to just do everything in a single job - compile for java 7 and still be able to test things in 1.8, including lambdas, which seems to be the main thing you were worried about. > On Thu, Mar 24, 2016 at 4:48 PM, Marcelo Vanzin wrote: >> >> On Thu, Mar 24, 2016 at 4:46 PM, Reynold Xin wrote: >> > Actually it's *way* harder to upgrade Scala from 2.10 to 2.11, than >> > upgrading the JVM runtime from 7 to 8, because Scala 2.10 and 2.11 are >> > not >> > binary compatible, whereas JVM 7 and 8 are binary compatible except >> > certain >> > esoteric cases. >> >> True, but ask anyone who manages a large cluster how long it would >> take them to upgrade the jdk across their cluster and validate all >> their applications and everything... binary compatibility is a tiny >> drop in that bucket. >> >> -- >> Marcelo > > -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org >>
Re: SparkSQL - Limit pushdown on BroadcastHashJoin
While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting. On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier < hvanhov...@questtec.nl> wrote: > I am not sure if you can push a limit through a join. This becomes > problematic if not all keys are present on both sides; in such a case a > limit can produce fewer rows than the set limit. > > This might be a rare case in which whole stage codegen is slower, due to > the fact that we need to buffer the result of such a stage. You could try > to disable it by setting "spark.sql.codegen.wholeStage" to false. > > 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan : > >> Hi, >> >> I ran the following query in spark (latest master codebase) and it took a >> lot of time to complete even though it was a broadcast hash join. >> >> It appears that limit computation is done only after computing complete >> join condition. Shouldn't the limit condition be pushed to >> BroadcastHashJoin (wherein it would have to stop processing after >> generating 10 rows?). Please let me know if my understanding on this is >> wrong. >> >> >> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit >> 10; >> >> >> | == Physical Plan == >> CollectLimit 10 >> +- WholeStageCodegen >>: +- Project [l_partkey#893] >>: +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, >> BuildRight, None >>::- Project [l_partkey#893] >>:: +- Filter isnotnull(l_partkey#893) >>:: +- Scan HadoopFiles[l_partkey#893] Format: ORC, >> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct >>:+- INPUT >>+- BroadcastExchange >> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as >> bigint)),List(ps_partkey#908)) >> +- WholeStageCodegen >> : +- Project [ps_partkey#908] >> : +- Filter isnotnull(ps_partkey#908) >> :+- Scan HadoopFiles[ps_partkey#908] Format: ORC, >> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct >> | >> >> >> >> >> >> -- >> ~Rajesh.B >> > >
Re: HDFS as Shuffle Service
Yes, HDFS has serious problems with creating lots of files. But we can always just create a single merged file on HDFS per task. On Apr 28, 2016 11:17 AM, "Reynold Xin" wrote: Hm while this is an attractive idea in theory, in practice I think you are substantially overestimating HDFS' ability to handle a lot of small, ephemeral files. It has never really been optimized for that use case. On Thu, Apr 28, 2016 at 11:15 AM, Michael Gummelt wrote: > > if after a work-load burst your cluster dynamically changes from 1 > workers to 1000, will the typical HDFS replication factor be sufficient to > retain access to the shuffle files in HDFS > > HDFS isn't resizing. Spark is. HDFS files should be HA and durable. > > On Thu, Apr 28, 2016 at 11:08 AM, Mark Hamstra > wrote: > >> Yes, replicated and distributed shuffle materializations are key >> requirement to maintain performance in a fully elastic cluster where >> Executors aren't just reallocated across an essentially fixed number of >> Worker nodes, but rather the number of Workers itself is dynamic. >> Retaining the file interface to those shuffle materializations while also >> using HDFS for the spark.local.dirs has a certain amount of attraction, but >> I also wonder whether a typical HDFS deployment is really sufficient to >> handle this kind of elastic cluster scaling. For instance and assuming >> HDFS co-located on worker nodes, if after a work-load burst your cluster >> dynamically changes from 1 workers to 1000, will the typical HDFS >> replication factor be sufficient to retain access to the shuffle files in >> HDFS, or will we instead be seeing numerous FetchFailure exceptions, Tasks >> recomputed or Stages aborted, etc. so that the net effect is not all that >> much different than if the shuffle files had not been relocated to HDFS and >> the Executors or ShuffleService instances had just disappeared along with >> the worker nodes? >> >> On Thu, Apr 28, 2016 at 10:46 AM, Michael Gummelt > > wrote: >> >>> > Why would you run the shuffle service on 10K nodes but Spark executors >>> on just 100 nodes? wouldn't you also run that service just on the 100 >>> nodes? >>> >>> We have to start the service beforehand, out of band, and we don't know >>> a priori where the Spark executors will land. Those 100 executors could >>> land on any of the 10K nodes. >>> >>> > What does plumbing it through HDFS buy you in comparison? >>> >>> It drops the shuffle service requirement, which is HUGE. It means Spark >>> can completely vacate the machine when it's not in use, which is crucial >>> for a large, multi-tenant cluster. ShuffledRDDs can now read the map files >>> from HDFS, rather than the ancestor executors, which means we can shut >>> executors down immediately after the shuffle files are written. >>> >>> > There's some additional overhead and if anything you lose some control >>> over locality, in a context where I presume HDFS itself is storing data on >>> much more than the 100 Spark nodes. >>> >>> Write locality would be sacrificed, but the descendent executors were >>> already doing a remote read (they have to read from multiple ancestor >>> executors), so there's no additional cost in read locality. In fact, if we >>> take advantage of HDFS's favored node feature, we could make it likely that >>> all map files for a given partition land on the same node, so the >>> descendent executor would never have to do a remote read! We'd effectively >>> shift the remote IO from read side to write side, for theoretically no >>> change in performance. >>> >>> In summary: >>> >>> Advantages: >>> - No shuffle service dependency (increased utilization, decreased >>> management cost) >>> - Shut executors down immediately after shuffle files are written, >>> rather than waiting for a timeout (increased utilization) >>> - HDFS is HA, so shuffle files survive a node failure, which isn't true >>> for the shuffle service (decreased latency during failures) >>> - Potential ability to parallelize shuffle file reads if we write a new >>> shuffle iterator (decreased latency) >>> >>> Disadvantages >>> - Increased write latency (but potentially not if we implement it >>> efficiently. See above). >>> - Would need some sort of GC on HDFS shuffle files >>> >>> >>> >>> >>> >>> On Thu, Apr 28, 2016 at 1:36 AM, Sean Owen wrote: >>> Why would you run the shuffle service on 10K nodes but Spark executors on just 100 nodes? wouldn't you also run that service just on the 100 nodes? What does plumbing it through HDFS buy you in comparison? There's some additional overhead and if anything you lose some control over locality, in a context where I presume HDFS itself is storing data on much more than the 100 Spark nodes. On Thu, Apr 28, 2016 at 1:34 AM, Michael Gummelt < mgumm...@mesosphere.io> wrote: >> Are you suggesting to have shuffle service persist and fetch data with >> hdfs, or skip shuffle service alto