Right... but the problem is still the same, no? Those N Jobs (aka Futures
or Threads) will all be running on the Driver. Each with its own
SparkSession. Isn't that going to put a lot of burden on one Machine? Is
that really distributing the load across the cluster? Am I missing
something?

Would it be better to use ECS (Elastic Container Service) for this use case
which allows us to autoscale?

On Tue, May 25, 2021 at 2:16 PM Sean Owen <sro...@gmail.com> wrote:

> What you could do is launch N Spark jobs in parallel from the driver. Each
> one would process a directory you supply with spark.read.parquet, for
> example. You would just have 10s or 100s of those jobs running at the same
> time.  You have to write a bit of async code to do it, but it's pretty easy
> with Scala Futures.
>
> On Tue, May 25, 2021 at 3:31 PM Eric Beabes <mailinglist...@gmail.com>
> wrote:
>
>> Here's the use case:
>>
>> We've a bunch of directories (over 1000) which contain tons of small
>> files in each. Each directory is for a different customer so they are
>> independent in that respect. We need to merge all the small files in each
>> directory into one (or a few) compacted file(s) by using a 'coalesce'
>> function.
>>
>> Clearly we can do this on the Driver by doing something like:
>>
>> list.par.foreach (dir =>compact(spark, dir))
>>
>> This works but the problem here is that the parallelism happens on Driver
>> which won't scale when we've 10,000 customers! At any given time there will
>> be only as many compactions happening as the number of cores on the Driver,
>> right?
>>
>> We were hoping to do this:
>>
>> val df = list.toDF()
>> df.foreach(dir => compact(spark,dir))
>>
>> Our hope was, this will distribute the load amongst Spark Executors &
>> will scale better.  But this throws the NullPointerException shown in the
>> original email.
>>
>> Is there a better way to do this?
>>
>>
>> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
>> silvio.fior...@granturing.com> wrote:
>>
>>> Why not just read from Spark as normal? Do these files have different or
>>> incompatible schemas?
>>>
>>>
>>>
>>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>>
>>>
>>>
>>> *From: *Eric Beabes <mailinglist...@gmail.com>
>>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>>> *To: *spark-user <user@spark.apache.org>
>>> *Subject: *Reading parquet files in parallel on the cluster
>>>
>>>
>>>
>>> I've a use case in which I need to read Parquet files in parallel from
>>> over 1000+ directories. I am doing something like this:
>>>
>>>
>>>
>>>    val df = list.toList.toDF()
>>>
>>>     df.foreach(c => {
>>>       val config = *getConfigs()*
>>> *      doSomething*(spark, config)
>>>     })
>>>
>>>
>>>
>>> In the doSomething method, when I try to do this:
>>>
>>> val df1 = spark.read.parquet(pathToRead).collect()
>>>
>>>
>>>
>>> I get a NullPointer exception given below. It seems the 'spark.read' only 
>>> works on the Driver not on the cluster. How can I do what I want to do? 
>>> Please let me know. Thank you.
>>>
>>>
>>>
>>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID
>>> 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>>> java.lang.NullPointerException
>>>
>>>
>>>
>>>         at
>>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>>
>>>
>>>
>>>         at
>>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>>
>>>
>>>
>>>         at
>>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>>
>>>
>>>
>>>         at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>>
>>>
>>>
>>>

Reply via email to