Hey Danny
Thanks for digging deeper into this topic!
Indeed we’ve been giving a thought to most of these points but you’ve raised a
couple of interesting ones, here are some ideas (inline):
> On Nov 27, 2024, at 4:00 AM, Danny Cranmer wrote:
>
> Hello Sergio,
>
> Thankyou for starting this discussion, I have a few questions.
>
> > having 2 identical pipelines running side-by-side
> How do you ensure correctness between the 2 identical pipelines? For example,
> processing time semantics or late data can result in different outputs.
> Some Sources have consumption quotas, for example Kinesis Data Streams, this
> may end up eating into this quota and cause problems.
> How do we handle sources like Queues when they cannot be consumed
> concurrently?
Yes for this analysis we’ve been staying away from Processing Time since it’s
(even for a single pipeline) to even “replay” idempotently. The most stable
scenario so far has been Event Time with Watermarks (the solution will be
extensible to accommodate other non-Watermark scenarios).
The Blue deployment should start from Green's most recent Checkpoint that way
we minimize the amount of time it needs to “catch up”, with Event Time is
easier to ensure that catch up portion will be replayed virtually the same way
as Green’s.
Since start up times, checkpointing intervals and overall the nature of the
data are strictly Business specific, the user should have a configurable way
(in this case via a “future” Watermark value) to indicate when the transition
between blue/green will occur. In other words, both pipelines have the same
configured “future” Watermark value to transition, both pipelines “see” the
exact same events/records, therefore the Blue/Green Gates can both start/stop
the record emission as soon as the configured Watermark is reached… they’re
mutually exclusive so the records should pass through one gate or another.
I don’t have experience yet with sources that cannot be consumed concurrently,
this will be a good one to analyze.
>
> > we explore the idea of empowering the pipeline to decide, at the record
> > level, what data goes through and what doesn’t (by means of a “Gate”
> > component).
> What happens if the Green job gets ahead of the Blue job? How will you pick a
> stopping point (per channel) to stop at consistently (checkpoint barriers
> solve this already, right?). If you use Point in (event) Time, you need to
> handle idle channels.
Hopefully the answer above addresses this, if not I’m happy to add more
clarification.
>
> > This new Controller will work in conjunction with a custom Flink Process
> > Function
> Would this rely on the user including the operator in the job graph, or would
> you somehow dynamically inject it?
> Where would you put this operator in complex Job graphs or forests?
Yes, initially an idea is to have a reusable ProcessFunction that the user can
place in their pipelines wherever they see fit. So far it seems like a good
idea to place it towards the end of the pipeline, e.g. before a sink, that way
the majority of the state of that job can be exploited and preserved until we
absolutely know we can tear the old Green pipeline down… or rollback.
Another idea, but it would be more intrusive and harder to test in an initial
iteration, is to add this as base Sink functionality; this way the sink could
know whether to write the record(s) or not.
>
> > A ConfigMap will be used to both contain the deployment parameters as well
> > as act as a communication conduit between the Controller and the two Jobs.
> Will a conflig map write/read cycle be fast enough? If you write into the CM
> "stop at record x" then the other Job might read past record x before it sees
> the command
Oh for sure this can happen. That’s why the user should have the capability of
defining the custom “future” cutover point, because the time span their
pipeline needs will be entirely business specific. For safety, our
ProcessFunction could even enforce a minimum default “time to wait” or “minimum
Watermark transition value” from the moment the pipeline started processing
records or something similar.
>
> > This function will listen for changes to the above mentioned ConfigMap and
> > each job will know if it’s meant to be Active or Standby, at record level.
> How will this work with exactly once sinks when you might have open
> transactions?
The same principle remains, records should be written to either Blue’s
transaction or to Green’s transaction.
Another way to illustrate these components is:
- Once a transition is initiated, Green’s Gate PF becomes StandBy and
Blue’s Gate PF becomes Active, however they gradually open or close (mutually
exclusive) in this fashion:
- Green’s Gate will taper off the emission of records as soon
as each subtask’s Watermark crosses the configured transition value
- Blue’s Gate will gradually start emitting records a