Hi Siying, thanks for your reply.

We currently run with "spark.speculation: false" so it is not speculative
execution. This is because the partition gets assigned to two different
executors on subsequent stages. In StateStore.scala in the doMaintenance()
function provide.doMaintenance() is called before verifying if the instance
is active. This is what leads to to maintenance tasks running
simultaneously for the same partition id on two different executors [1].

I also found out that usually, since versions to retain is > 1 then a
timestamp from the N-1 version will protect files for the N version. But
this job runs into periods where nearly no data is coming in, so there are
no SST files associated with N-1 that would have a lesser timestamp which
would have avoided the incorrect deletion.

Regarding checkpoint structure V2. Unfortunately in my organization we are
going to still be on 3.5 for some time, so I would need to fix the issue
for the 3.5 version. The checkpoint structure v2 looks promising though.

Thanks for your reply again and let me know if you have any additional
thoughts. What do you think of my suggestion to add a small padding to the
timestamp check to prevent the incorrect deletion?


[1]
https://github.com/apache/spark/blob/f9a5c8cc92e2ce2e22613d7fde77427fb1699269/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L636

On Mon, Aug 25, 2025 at 1:33 PM Siying Dong <siying.d...@databricks.com>
wrote:

> I suspect that this problem will be mitigated with checkpoint structure V2
> ( https://issues.apache.org/jira/browse/SPARK-49374
> https://github.com/apache/spark/blob/bc36a7db43f287af536bb2767d7d9f1d70bc799f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2656
> ). The motivation of the new format is to eliminate parallel universes, so
> it would help this case too.
>
> Why do you have parallel universes for two executors for such a long time?
> Is it because of speculative execution?
>
> Thanks,
>
> Siying
>
> On Mon, Aug 18, 2025 at 9:42 AM Pedro Miguel Duarte <pmd...@gmail.com>
> wrote:
>
>> Hello structured streaming experts!
>>
>> We are getting SST FileNotFound state store corruption errors.
>>
>> The root cause is a race condition where two different executors are
>> doing cleanup of the state store at the same time.  Both write the version
>> of the state zip file to DFS.
>>
>> The first executor enters maintenance, writes SST files and writes the
>> 636.zip file.
>>
>> Concurrently the second executor enters maintenance, writes SST files and
>> writes 636.zip.
>>
>> The SST files in dfs are written almost simultaneously and are:
>>    - 000867-4695ff6e-d69d-4792-bcd6-191b57eadb9d.sst   <-- from one
>> executor
>>    - 000867-a71cb8b2-9ed8-4ec1-82e2-a406dd1fb949.sst   <-- from other
>> executor
>>
>> The problem occurs during orphan file deletion (see this PR
>> <https://github.com/apache/spark/pull/39897/files>). The executor that
>> lost the race to write 636.zip decides that it will delete the SST file
>> that is actually referenced in 636.zip.
>>
>> Currently the maintenance task does have some protection for files of
>> ongoing tasks. This is from the comments in RocksDBFileManager: "only
>> delete orphan files that are older than all tracked files when there are at
>> least 2 versions".
>>
>> In this case the file that is being deleted is indeed older but only by a
>> small amount of time.
>>
>> Instead of simply older, should there be some padding to allow for
>> maintenance being executed simultaneously on two executors?  Something like
>> at least 60s older than the oldest tracked file.
>>
>> This should help avoid state store corruption at the expense of some
>> storage space in the DFS.
>>
>> Thanks for any help or recommendations here.
>>
>

Reply via email to