Dennis-Mircea Ciupitu created FLINK-39752:
---------------------------------------------

             Summary: Thread EventRecorder through FlinkResourceContext instead 
of holding it on the factory
                 Key: FLINK-39752
                 URL: https://issues.apache.org/jira/browse/FLINK-39752
             Project: Flink
          Issue Type: Improvement
          Components: Kubernetes Operator
    Affects Versions: kubernetes-operator-1.15.0
            Reporter: Dennis-Mircea Ciupitu
             Fix For: kubernetes-operator-1.16.0


h1. Background

{{FlinkResourceContextFactory}} currently holds an {{EventRecorder}} as a 
constructor-injected field. That recorder is created once at {{FlinkOperator}} 
construction time and shared across every controller, regardless of which 
controller is calling {{getResourceContext(...)}}.

At the same time, {{FlinkSessionJobController}} and 
{{FlinkStateSnapshotController}} each create their own local {{EventRecorder}} 
via {{EventRecorder.create(client, listeners)}} during {{register*Controller}}, 
and {{FlinkBlueGreenDeploymentController}} has none at all. The result is an 
inconsistency:

* Reconcile code in a controller emits events through that controller's local 
recorder.
* The {{FlinkService}} built inside the same reconcile (via 
{{ctxFactory.getFlinkService(ctx)}}) emits events through the operator-scoped 
recorder held on the factory.

In other words, a single reconcile uses two different {{EventRecorder}} 
instances for events that conceptually belong to the same controller.

h1. Why this matters

Today {{EventRecorder}} is effectively stateless. Every public method takes the 
{{KubernetesClient}} as a parameter and forwards to {{EventUtils}}, and the 
only instance state is two {{BiConsumer}} closures over {{client}} and 
{{listeners}}. Two recorders constructed with the same {{client}} and the same 
{{listeners}} behave identically, so the asymmetry described above does not 
cause observable bugs in production right now.

It is, however, a latent bug. The moment any per-instance state is added to 
{{EventRecorder}} (per-controller dedup cache, per-controller listener subset, 
per-controller metrics, rate limiting), events emitted by {{FlinkService}} will 
silently route through the wrong recorder. That failure mode is hard to detect 
in review because the divergence spans two files and the type signatures look 
correct.

The current naming pattern (a local {{var eventRecorder = 
EventRecorder.create(...)}} in each {{register*Controller}}) also misleads 
readers into thinking each controller owns its recorder end-to-end, when in 
fact the factory keeps using the operator-scoped one.

h1. Proposal

Thread {{EventRecorder}} through the context lifecycle instead of holding it on 
the factory.

* {{FlinkResourceContextFactory}} no longer takes an {{EventRecorder}} as a 
constructor argument.
* {{FlinkResourceContext}} (and its concrete subclasses) gains an 
{{EventRecorder}} field.
* {{getResourceContext(resource, josdkContext, eventRecorder)}} accepts the 
recorder per call and stores it on the context.
* {{getFlinkService(ctx)}} reads the recorder via {{ctx.getEventRecorder()}} 
instead of from a factory field.
* Each {{register*Controller}} method in {{FlinkOperator}} owns its own 
{{EventRecorder}} (including {{registerBlueGreenController}}, which gains one 
for the first time) and passes it into the controller. Every 
{{ctxFactory.getResourceContext(...)}} call site in production code passes the 
calling controller's recorder.

This mirrors how {{josdkContext}} is already handled, since both are 
per-reconcile inputs rather than factory-lifetime dependencies.

h1. Benefits

* Removes the latent bug where {{FlinkService}}-emitted events would route 
through the wrong recorder once per-instance state is added.
* Aligns the runtime behavior with the mental model implied by the {{var 
eventRecorder = EventRecorder.create(...)}} pattern already present in 
{{registerSessionJobController}} and {{registerSnapshotController}}.
* Keeps shared singletons shared. The factory continues to own 
{{clientExecutorService}}, {{artifactManager}}, {{resourceMetricGroups}}, and 
{{lastRecordedExceptionCache}}, so no thread pools or caches get duplicated.
* Makes asymmetric paths (blue-green, snapshot operating on a 
{{FlinkDeployment}} it does not own) explicit rather than implicit, since each 
call site has to pick which recorder to pass.

h1. Out of scope

* No behavioral change is intended in this ticket. Existing tests must continue 
to pass without modification beyond the mechanical signature updates.
* Other follow-ups that build on this (per-controller dedup, richer listener 
wiring, event metrics) are deliberately not part of this change.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to