joeyutong opened a new pull request, #581:
URL: https://github.com/apache/flink-agents/pull/581
…fter rescaling
<!--
* Thank you very much for contributing to Flink Agents.
* Please add the relevant components in the PR title. E.g., [api],
[runtime], [java], [python], [hotfix], etc.
-->
<!-- Please link the PR to the relevant issue(s). Hotfix doesn't need this.
-->
### Purpose of change
`ActionExecutionOperator` persists `currentProcessingKeysOpState` as union
operator state so all subtasks can see in-flight keys after restore/rescaling.
However, `tryResumeProcessActionTasks()` resumed every restored key without
checking whether the key belonged to the current subtask.
That behavior is incorrect after scale-out: multiple subtasks can receive
the same union state entry, so non-owner subtasks may also enqueue resume work
for keys they do not own. This can lead to duplicated processing after recovery.
This change fixes the restore path by filtering
`currentProcessingKeysOpState` with subtask ownership before resubmitting work:
- compute the key group for each restored key
- compute the owning subtask for the current maxParallelism and parallelism
- only resume keys owned by the current subtask
The ownership check is encapsulated in `isKeyOwnedByCurrentSubtask(...)` in
`ActionExecutionOperator.java.
`
### Tests
Added an operator-level restore/rescaling regression test in
`ActionExecutionOperatorTest.java`:
- snapshot state from parallelism=1
- repartition it to parallelism=2
- restore both subtasks
- verify only the owner subtask gets a resume mail, while the non-owner
subtask does not
### Documentation
<!-- Do not remove this section. Check the proper box only. -->
- [ ] `doc-needed` <!-- Your PR changes impact docs -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-included` <!-- Your PR already contains the necessary
documentation updates -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]