Right... but the problem is still the same, no? Those N Jobs (aka Futures or Threads) will all be running on the Driver. Each with its own SparkSession. Isn't that going to put a lot of burden on one Machine? Is that really distributing the load across the cluster? Am I missing something?
Would it be better to use ECS (Elastic Container Service) for this use case which allows us to autoscale? On Tue, May 25, 2021 at 2:16 PM Sean Owen <sro...@gmail.com> wrote: > What you could do is launch N Spark jobs in parallel from the driver. Each > one would process a directory you supply with spark.read.parquet, for > example. You would just have 10s or 100s of those jobs running at the same > time. You have to write a bit of async code to do it, but it's pretty easy > with Scala Futures. > > On Tue, May 25, 2021 at 3:31 PM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Here's the use case: >> >> We've a bunch of directories (over 1000) which contain tons of small >> files in each. Each directory is for a different customer so they are >> independent in that respect. We need to merge all the small files in each >> directory into one (or a few) compacted file(s) by using a 'coalesce' >> function. >> >> Clearly we can do this on the Driver by doing something like: >> >> list.par.foreach (dir =>compact(spark, dir)) >> >> This works but the problem here is that the parallelism happens on Driver >> which won't scale when we've 10,000 customers! At any given time there will >> be only as many compactions happening as the number of cores on the Driver, >> right? >> >> We were hoping to do this: >> >> val df = list.toDF() >> df.foreach(dir => compact(spark,dir)) >> >> Our hope was, this will distribute the load amongst Spark Executors & >> will scale better. But this throws the NullPointerException shown in the >> original email. >> >> Is there a better way to do this? >> >> >> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito < >> silvio.fior...@granturing.com> wrote: >> >>> Why not just read from Spark as normal? Do these files have different or >>> incompatible schemas? >>> >>> >>> >>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths) >>> >>> >>> >>> *From: *Eric Beabes <mailinglist...@gmail.com> >>> *Date: *Tuesday, May 25, 2021 at 1:24 PM >>> *To: *spark-user <user@spark.apache.org> >>> *Subject: *Reading parquet files in parallel on the cluster >>> >>> >>> >>> I've a use case in which I need to read Parquet files in parallel from >>> over 1000+ directories. I am doing something like this: >>> >>> >>> >>> val df = list.toList.toDF() >>> >>> df.foreach(c => { >>> val config = *getConfigs()* >>> * doSomething*(spark, config) >>> }) >>> >>> >>> >>> In the doSomething method, when I try to do this: >>> >>> val df1 = spark.read.parquet(pathToRead).collect() >>> >>> >>> >>> I get a NullPointer exception given below. It seems the 'spark.read' only >>> works on the Driver not on the cluster. How can I do what I want to do? >>> Please let me know. Thank you. >>> >>> >>> >>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID >>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11): >>> java.lang.NullPointerException >>> >>> >>> >>> at >>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144) >>> >>> >>> >>> at >>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142) >>> >>> >>> >>> at >>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789) >>> >>> >>> >>> at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656) >>> >>> >>> >>>