Dennis-Mircea Ciupitu created FLINK-39752:
---------------------------------------------
Summary: Thread EventRecorder through FlinkResourceContext instead
of holding it on the factory
Key: FLINK-39752
URL: https://issues.apache.org/jira/browse/FLINK-39752
Project: Flink
Issue Type: Improvement
Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.15.0
Reporter: Dennis-Mircea Ciupitu
Fix For: kubernetes-operator-1.16.0
h1. Background
{{FlinkResourceContextFactory}} currently holds an {{EventRecorder}} as a
constructor-injected field. That recorder is created once at {{FlinkOperator}}
construction time and shared across every controller, regardless of which
controller is calling {{getResourceContext(...)}}.
At the same time, {{FlinkSessionJobController}} and
{{FlinkStateSnapshotController}} each create their own local {{EventRecorder}}
via {{EventRecorder.create(client, listeners)}} during {{register*Controller}},
and {{FlinkBlueGreenDeploymentController}} has none at all. The result is an
inconsistency:
* Reconcile code in a controller emits events through that controller's local
recorder.
* The {{FlinkService}} built inside the same reconcile (via
{{ctxFactory.getFlinkService(ctx)}}) emits events through the operator-scoped
recorder held on the factory.
In other words, a single reconcile uses two different {{EventRecorder}}
instances for events that conceptually belong to the same controller.
h1. Why this matters
Today {{EventRecorder}} is effectively stateless. Every public method takes the
{{KubernetesClient}} as a parameter and forwards to {{EventUtils}}, and the
only instance state is two {{BiConsumer}} closures over {{client}} and
{{listeners}}. Two recorders constructed with the same {{client}} and the same
{{listeners}} behave identically, so the asymmetry described above does not
cause observable bugs in production right now.
It is, however, a latent bug. The moment any per-instance state is added to
{{EventRecorder}} (per-controller dedup cache, per-controller listener subset,
per-controller metrics, rate limiting), events emitted by {{FlinkService}} will
silently route through the wrong recorder. That failure mode is hard to detect
in review because the divergence spans two files and the type signatures look
correct.
The current naming pattern (a local {{var eventRecorder =
EventRecorder.create(...)}} in each {{register*Controller}}) also misleads
readers into thinking each controller owns its recorder end-to-end, when in
fact the factory keeps using the operator-scoped one.
h1. Proposal
Thread {{EventRecorder}} through the context lifecycle instead of holding it on
the factory.
* {{FlinkResourceContextFactory}} no longer takes an {{EventRecorder}} as a
constructor argument.
* {{FlinkResourceContext}} (and its concrete subclasses) gains an
{{EventRecorder}} field.
* {{getResourceContext(resource, josdkContext, eventRecorder)}} accepts the
recorder per call and stores it on the context.
* {{getFlinkService(ctx)}} reads the recorder via {{ctx.getEventRecorder()}}
instead of from a factory field.
* Each {{register*Controller}} method in {{FlinkOperator}} owns its own
{{EventRecorder}} (including {{registerBlueGreenController}}, which gains one
for the first time) and passes it into the controller. Every
{{ctxFactory.getResourceContext(...)}} call site in production code passes the
calling controller's recorder.
This mirrors how {{josdkContext}} is already handled, since both are
per-reconcile inputs rather than factory-lifetime dependencies.
h1. Benefits
* Removes the latent bug where {{FlinkService}}-emitted events would route
through the wrong recorder once per-instance state is added.
* Aligns the runtime behavior with the mental model implied by the {{var
eventRecorder = EventRecorder.create(...)}} pattern already present in
{{registerSessionJobController}} and {{registerSnapshotController}}.
* Keeps shared singletons shared. The factory continues to own
{{clientExecutorService}}, {{artifactManager}}, {{resourceMetricGroups}}, and
{{lastRecordedExceptionCache}}, so no thread pools or caches get duplicated.
* Makes asymmetric paths (blue-green, snapshot operating on a
{{FlinkDeployment}} it does not own) explicit rather than implicit, since each
call site has to pick which recorder to pass.
h1. Out of scope
* No behavioral change is intended in this ticket. Existing tests must continue
to pass without modification beyond the mechanical signature updates.
* Other follow-ups that build on this (per-controller dedup, richer listener
wiring, event metrics) are deliberately not part of this change.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)