Dear Spark Community,

I run a Structured Streaming Query to read json files from S3 into an Iceberg 
table.  This is my query:

```python
stream_reader = (
    spark_session.readStream.format("json")
    .schema(schema)
    .option("maxFilesPerTrigger", 256_000)
    .option("basePath", f"s3a://test-bucket/root_dir/")
    .load(f"s3a://test-bucket/root_dir/2025/04/")
    .coalesce(8)
    .withColumn("object_key", input_file_name())
)
stream = (
    stream_reader.writeStream.queryName(f"test_stream")
    .format("iceberg")
    .outputMode("append")
    .option("checkpointLocation", 
f"s3a://ttest-bucket/checkpoints/{uuid.uuid4()}/")
    .trigger(processingTime="10 seconds")
    .toTable(target_table_full_name)
)
```

My data on S3 has this structure:
```
root_dir/
└── 2025/
    └── 04/
        ├── 15/
        │   ├── 123e4567-e89b-12d3-a456-426614174003.json
        │   └── 123e4567-e89b-12d3-a456-426614174004.json
        ├── 16/
        │   ├── 123e4567-e89b-12d3-a456-426614174000.json
        │   ├── 123e4567-e89b-12d3-a456-426614174001.json
        │   └── 123e4567-e89b-12d3-a456-426614174002.json
        └── 17/
            ├── 123e4567-e89b-12d3-a456-426614174005.json
            └── 123e4567-e89b-12d3-a456-426614174006.json
```

These are millions of 1.5KB files.

I encounter issues with the initial listing: when I start the stream I see this 
log:
```
Total size of serialized results of 51 tasks (1073.3 MiB) is bigger than 
spark.driver.maxResultSize (1024.0 MiB)
```

51 is the number of sub directories I have in my test setup. It seems that 
Spark recognises sub directories as partitions, and does the listing per 
partition, but in the end still aggregates everything. This error happens for 
total 5.5 mln files.
Setting maxFilesPerTrigger does not help to limit this initial listing either.

Please, give me a hint in how to handle this initial listing for potentially 
billions of files.

My setup is a standalone Spark 3.5.1 cluster with Spark Connect.

Best regards,
Anastasiia

Reply via email to