Re: [build system] github fetches timing out
Unsubscribe On Tue, Mar 9, 2021 at 3:32 PM shane knapp ☠ wrote: > it looks like over the past few days the master/branch builds have been > timing out... this hasn't happened in a few years, and honestly the last > times this happened there was nothing that either i, or github could do > about it. it cleared up after a number of weeks, and we were never able to > pinpoint the root cause. > > we're not hitting a github api ratelimit, and i'm able to successfully run > the git commands on worker nodes on the command line as the jenkins user. > > example: > > https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-3.0-test-maven-hadoop-2.7-hive-2.3-jdk-11/1014/console > > i wish i had a more concrete answer or solution for what's going on... > i'll continue to investigate as best i can today, and if this continues, > i'll re-open my issue w/github and see if they can shed any light on the > situation. > > shane > -- > Shane Knapp > Computer Guy / Voice of Reason > UC Berkeley EECS Research / RISELab Staff Technical Lead > https://rise.cs.berkeley.edu >
Resolving generated expressions in catalyst
Hey everyone! I'm trying to implement a custom catalyst optimization that I think may be useful to others that make frequent use of the arrays_overlap and array_contains functions in joins. Consider this first query joining on overlapping arrays. ``` import org.apache.spark.sql.functions._ val left = Seq((Seq(1, 2, 3, 4), "hi")).toDF("arr", "word") val right = Seq((Seq(2, 5), "bye")).toDF("arr", "word") // This results in a cartesian product in the physical plan if the tables are sufficiently large val naiveJoin = left.join(right, arrays_overlap(left("arr"), right("arr"))) ``` We can transform it into one like this that // This will result in a non-cartesian product join val fastJoin = { left.withColumn("explode_larr", explode(left("arr"))).as("__lp").join( right.withColumn("explode_rarr", explode(col("arr"))).as("__rp"), col("explode_larr") === col("explode_rarr") ).drop("explode_larr", "explode_rarr").distinct } I've implemented a first attempt of this optimization on my fork: but I'm having difficulty figuring out how to resolve my attributes on the exploded column. https://github.com/nvander1/spark/commit/711184f98774b7ac46fcfdf4e28e2d71041d89e1 Examining the logical tree of fastJoin: 00 Deduplicate [arr#617, arr#626, word#618, word#627] 01 +- Project [arr#617, word#618, arr#626, word#627] 02 +- Join Inner, (explode_larr#643 = explode_rarr#648) 03 :- SubqueryAlias `__lp` 04 : +- Project [arr#617, word#618, explode_larr#643] 05 : +- Generate explode(arr#617), false, [explode_larr#643] 06 : +- Project [_1#614 AS arr#617, _2#615 AS word#618] 07 : +- LocalRelation [_1#614, _2#615] 08 +- SubqueryAlias `__rp` 09 +- Project [arr#626, word#627, explode_rarr#648] 10 +- Generate explode(arr#626), false, [explode_rarr#648] 11 +- Project [_1#623 AS arr#626, _2#624 AS word#627] 12 +- LocalRelation [_1#623, _2#624] This is the logical tree of my implementation thus far: 'Deduplicate +- 'Project [arr#2143, word#2144, arr#2152, word#2153] +- 'Join Inner, ('explode_larr = 'explode_rarr) :- 'SubqueryAlias `__lp` : +- 'Project [arr#2143, word#2144, 'explode_larr] : +- 'Generate explode(arr#2143), false, explode_larr : +- LocalRelation [arr#2143, word#2144] +- 'SubqueryAlias `__rp` +- 'Project [arr#2152, word#2153, 'explode_rarr] +- 'Generate explode(arr#2152), false, explode_rarr +- LocalRelation [arr#2152, word#2153] Related information (similar cases): https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27359?filter=addedrecently
Re: Why Snappy Compression?
Thank you for the detailed response. This is helpful. I’ll read your article, and test my data as you’ve described. On Tue, Aug 26, 2025 at 3:05 PM Mich Talebzadeh wrote: > Hi Nikolas, > > *Why Spark defaults to Snappy for Parquet.* In analytics scans the > bottleneck is usually *CPU to decompress Parquet pages*, not raw I/O. > Snappy gives *very fast decode* at a decent ratio, so end-to-end query > latency is typically better than heavier codecs like GZIP. For colder data, > GZIP (or ZSTD) can make sense if you’re chasing storage savings and can > afford slower reads. > > Two different codec decisions to make > >1. > >Intermediates (shuffle/spill/broadcast) — speed > ratio >I keep fast codecs here; changing them rarely helps unless the >network/disk is the bottleneck and I have spare CPU: > >*spark.conf.set("spark.shuffle.compress", "true") >spark.conf.set("spark.shuffle.spill.compress", "true") >spark.conf.set("spark.io.compression.codec", "lz4") // snappy or zstd > are also viable >* > >2. > >Storage at rest (final Parquet in the lake/lakehouse) — pick by hot vs >cold >- > > *Hot / frequently scanned:* *Snappy* for fastest reads. > - > > *Cold / archival:* *GZIP* (or try *ZSTD*) for much smaller files; > accept slower scans. > >*spark.conf.set("spark.sql.parquet.compression.codec", "snappy") // or > "gzip" or "zstd"* > > > This mirrors what I wrote up for *BigQuery external Parquet on object > storage *as attached (different engine, same storage trade-off): I used > *Parquet > + GZIP* when exporting to Cloud Storage (great size reduction) and noted > that *external tables read slower than native*—so I keep hot data > “native” and push colder tiers to cheaper storage with heavier compression. > In that piece, a toy query ran ~*190 ms* on native vs ~*296 ms* on the > external table (≈43% slower), which is the kind of latency gap you trade > for cost/footprint savings on colder data . > > *Bigger levers than the codec* > The codec choice matters, but *reading fewer bytes* matters more! In my > article I lean heavily on *Hive-style partition layouts* for external > Parquet (multiple partition keys, strict directory order), and call out > gotchas like keeping *non-Parquet junk out of leaf directories *(external > table creation/reads can fail/slow if the layout’s messy) . > > How I would benchmark on your data > Write the same dataset three ways (snappy, gzip, zstd), then measure: > >- > >total bytes on storage, >- > >Spark SQL *scan time* and *CPU time* in the UI, >- > >effect of *partition pruning* with realistic filters. >Keep the shuffle settings fast (above) so you’re testing scan costs, >not an artificially slow shuffle. > > My rules of thumb > >- > >If *latency* and interactive work matter → *Snappy* Parquet. >- > >If *storage $$* dominates and reads are rare → *GZIP* (or *ZSTD* as a >middle ground). >- > >Regardless of codec, *partition pruning + sane file sizes* move the >needle the most (that’s the core of my “Hybrid Curated Storage” approach) > > HTH > > Regards > Dr Mich Talebzadeh, > Architect | Data Science | Financial Crime | Forensic Analysis | GDPR > > (P.S. The background and examples I referenced are from my article on > using *GCS external Parquet* with *Snappy/GZIP/ZSTD* and Hive > partitioning for cost/perf balance—feel free to skim the compression/export > and partitioning sections.) > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > On Tue, 26 Aug 2025 at 17:59, Nikolas Vanderhoof < > nikolasrvanderh...@gmail.com> wrote: > >> Hello, >> >> Why does Spark use Snappy by default when compressing data within >> Parquet? I’ve read that when shuffling, speed is prioritized above >> compression ratio. Is that true, and are there other things to consider? >> >> Also, are there any recent benchmarks that the community has performed >> that evaluate the performance of Spark when using Snappy compared to other >> codecs? I’d be interested not only in the impact when using other codecs >> for the intermediate and shuffle files, but also for the storage at rest. >> For example, I know there are different configuration options that allow me >> to set the codec for these internal files, or for the final parquet files >> stored in the lakehouse. >> >> Before I decide to use a codec other than the default in my work, I want >> to understand any tradeoffs better. >> >> Thanks, >> Nik >> >
Why Snappy Compression?
Hello, Why does Spark use Snappy by default when compressing data within Parquet? I’ve read that when shuffling, speed is prioritized above compression ratio. Is that true, and are there other things to consider? Also, are there any recent benchmarks that the community has performed that evaluate the performance of Spark when using Snappy compared to other codecs? I’d be interested not only in the impact when using other codecs for the intermediate and shuffle files, but also for the storage at rest. For example, I know there are different configuration options that allow me to set the codec for these internal files, or for the final parquet files stored in the lakehouse. Before I decide to use a codec other than the default in my work, I want to understand any tradeoffs better. Thanks, Nik