Thanks for your time & advice. We will experiment & see which works best for us.... EMR or ECS.
On Tue, May 25, 2021 at 2:39 PM Sean Owen <sro...@gmail.com> wrote: > No, the work is happening on the cluster; you just have (say) 100 parallel > jobs running at the same time. You apply spark.read.parquet to each dir -- > from the driver yes, but spark.read is distributed. At extremes, yes that > would challenge the driver, to manage 1000s of jobs concurrently. You may > also find that if each job is tiny, there's some overhead in running each > as a distributed operation that may be significant. But it seems like the > simplest thing and will probably work fine. > > On Tue, May 25, 2021 at 4:34 PM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Right... but the problem is still the same, no? Those N Jobs (aka Futures >> or Threads) will all be running on the Driver. Each with its own >> SparkSession. Isn't that going to put a lot of burden on one Machine? Is >> that really distributing the load across the cluster? Am I missing >> something? >> >> Would it be better to use ECS (Elastic Container Service) for this use >> case which allows us to autoscale? >> >> On Tue, May 25, 2021 at 2:16 PM Sean Owen <sro...@gmail.com> wrote: >> >>> What you could do is launch N Spark jobs in parallel from the driver. >>> Each one would process a directory you supply with spark.read.parquet, for >>> example. You would just have 10s or 100s of those jobs running at the same >>> time. You have to write a bit of async code to do it, but it's pretty easy >>> with Scala Futures. >>> >>> On Tue, May 25, 2021 at 3:31 PM Eric Beabes <mailinglist...@gmail.com> >>> wrote: >>> >>>> Here's the use case: >>>> >>>> We've a bunch of directories (over 1000) which contain tons of small >>>> files in each. Each directory is for a different customer so they are >>>> independent in that respect. We need to merge all the small files in each >>>> directory into one (or a few) compacted file(s) by using a 'coalesce' >>>> function. >>>> >>>> Clearly we can do this on the Driver by doing something like: >>>> >>>> list.par.foreach (dir =>compact(spark, dir)) >>>> >>>> This works but the problem here is that the parallelism happens on >>>> Driver which won't scale when we've 10,000 customers! At any given time >>>> there will be only as many compactions happening as the number of cores on >>>> the Driver, right? >>>> >>>> We were hoping to do this: >>>> >>>> val df = list.toDF() >>>> df.foreach(dir => compact(spark,dir)) >>>> >>>> Our hope was, this will distribute the load amongst Spark Executors & >>>> will scale better. But this throws the NullPointerException shown in the >>>> original email. >>>> >>>> Is there a better way to do this? >>>> >>>> >>>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito < >>>> silvio.fior...@granturing.com> wrote: >>>> >>>>> Why not just read from Spark as normal? Do these files have different >>>>> or incompatible schemas? >>>>> >>>>> >>>>> >>>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths) >>>>> >>>>> >>>>> >>>>> *From: *Eric Beabes <mailinglist...@gmail.com> >>>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM >>>>> *To: *spark-user <user@spark.apache.org> >>>>> *Subject: *Reading parquet files in parallel on the cluster >>>>> >>>>> >>>>> >>>>> I've a use case in which I need to read Parquet files in parallel from >>>>> over 1000+ directories. I am doing something like this: >>>>> >>>>> >>>>> >>>>> val df = list.toList.toDF() >>>>> >>>>> df.foreach(c => { >>>>> val config = *getConfigs()* >>>>> * doSomething*(spark, config) >>>>> }) >>>>> >>>>> >>>>> >>>>> In the doSomething method, when I try to do this: >>>>> >>>>> val df1 = spark.read.parquet(pathToRead).collect() >>>>> >>>>> >>>>> >>>>> I get a NullPointer exception given below. It seems the 'spark.read' only >>>>> works on the Driver not on the cluster. How can I do what I want to do? >>>>> Please let me know. Thank you. >>>>> >>>>> >>>>> >>>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID >>>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11): >>>>> java.lang.NullPointerException >>>>> >>>>> >>>>> >>>>> at >>>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144) >>>>> >>>>> >>>>> >>>>> at >>>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142) >>>>> >>>>> >>>>> >>>>> at >>>>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789) >>>>> >>>>> >>>>> >>>>> at >>>>> org.apache.spark.sql.SparkSession.read(SparkSession.scala:656) >>>>> >>>>> >>>>> >>>>>