Hello,

I think I noticed some Spark behavior that might have an enormous potential
for performance improvement when reading files from a folder with (many
nested) hive-style partitions and at the same time applying a filter to the
partition columns.
Situation

I have JSON files with log information stored in physical folders
partitioned by year, month, day and hour:

/logs
|-- year=2020
|-- year=2021
`-- year=2022
    |-- month=01
    `-- month=02
        |-- day=01
        |-- day=...
        `-- day=13
            |-- hour=0000
            |-- hour=...
            `-- hour=0900
                |-- log000001.json
                |-- <many files>
                `-- log099133.json

I'd like to *only read specific JSON files* into a Dataframe (e.g. only
those *partitions* of the last 2 hours).
Background info

Spark generally supports *partition discovery* for hive-like folder
structures just like in my example:

*All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able
to discover and infer partitioning information automatically.*

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
.

So, when creating a dataframe by reading from the base path "/logs", the
dataframe will have the columns "year", "month", "day", "hour" - even when
these attributes are not contained in the JSON files directly. Spark
*derives* these columns from the folder names.
Problem

As stated above, I don't want to read *all *of those JSONs but only those
of the last two hours. Therefore I specify my dataframe as:

df1 = (spark
  .read
    .format('json')
    .schema(predefined_schema)
    .option('basePath', '/logs')
    .load('/logs')
  .filter('year=2022 AND month=04 AND day=19 AND hour>=1000 AND hour <=1200')
)
df1.explain(mode="formatted")

Executing this already takes a *couple of minutes* - even though no actual
data is read. Looking at the Spark logs, I found that Spark is *exploring
the whole file structure* under '/logs' - so every single folder content is
listed under '/logs'. I'm wondering why that is. Regarding the specified
filter, Spark could utilize this information already during folder
exploration and take a shortcut. It would only need to read the folders
under '/log/'*, then realize that all folders except '/log/year=2022' are
irrelevant; proceed and explorer the folders under '/log/year=2022/'* then
realize that all folders except '/log/year=2022/month=04' are irrelevant;
and so on...

I implemented a function that basically does the folder exploration up
front and assembles a list of relevant folder paths.

relevant_folders = [
    '/log/year=2022/month=04/day=19/hour=1000'
  , '/log/year=2022/month=04/day=19/hour=1100'
  , '/log/year=2022/month=04/day=19/hour=1200'
]
df2 = (spark
  .read
    .format('json')
    .schema(predefined_schema)
    .option('basePath', '/logs')
    .load(relevant_folders)
)
df2.explain(mode="formatted")

Executing this only *takes seconds*.

I was wondering if this should not rather be some built-in functionality of
Spark.
One more thing

Don't get me wrong: both dataframes above lead to the more or less same
execution plan and have similar execution times for the actual read
operation. In the end in both cases Spark *only reads those physical JSON
files fitting the filter conditions*. However *specifying* the dataframe
df1 took way longer than df2.
Doing some math

For simplicity let's assume that we have data of three full years, and
every month has 30 days.

For df1 that's *8.4 million list operations* (root: 1, year: 3, month: 36,
day: 3,240, hour: 8,398,080)

For df2 that's 3 list operations; but I did 4 more list operations upfront
in oder do come up with the relevant_folders list; so *7 list operations* in
total (root: 1, year: 1, month: 1, day: 1, hour: 3)

Cheers

Martin

Reply via email to