lrsb created FLINK-39891:
----------------------------
Summary: Cancel orphaned session jobs on the session cluster
before (re)submitting a FlinkSessionJob
Key: FLINK-39891
URL: https://issues.apache.org/jira/browse/FLINK-39891
Project: Flink
Issue Type: Improvement
Components: Kubernetes Operator
Affects Versions: 1.15.0, 1.14.0, 1.12.0, 1.13
Reporter: lrsb
Background
FlinkSessionJob submission generates a random JobID, records it in status as
RECONCILING, then submits to the session cluster.
FLINK-38858 hardened this against the common crash window: on a retry the
operator
reuses the recorded JobID while state is RECONCILING, and Flink rejects a
same JobID resubmit with DuplicateJobSubmissionException. This prevents
duplicates when the recorded JobID survives.
Problem
A residual window remains where the operator generates a new JobID while a
job from a previous submission is still running on the session cluster, leaving
the
old job orphaned. Two jobs then run concurrently against the same sources/sinks,
breaking exactly-once semantics and double-committing to external systems. This
happens when the recorded JobID is not reused, e.g.: the submit REST call
succeeds on the cluster but the status patch persisting the JobID was lost
(operator crash) before it was durably recorded -> on retry existingJobId ==
null -> new JobID generated.
The existing checkIfAlreadyUpgraded only probes the ** currently recorded JobID.
Proposed change
Add an opt-in safeguard: before submitting (only when *not* reusing an existing
JobID), list non-terminal jobs on the session cluster, cancel any that belong to
this FlinkSessionJob, wait for them to reach a terminal state, then submit. Gate
behind a new dynamic config option, default false
(kubernetes.operator.session-job.cancel-orphaned-on-submit).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)