Hi Aleksandr,

Thanks for the adjustments, I've had a deep look at it.

The FLIP looks good, a tiny suggestion is
that DelegationTokenManagerCallback can live inside DelegationTokenManager
just like the Listener.
The design looks good from my side to implement it together with the Kafka
part.

As a side note now I see why Kafka as reference is suggested (apart from
that's the development expectation) :)
Applying the new Job aware token framework to filesystems contains some
non-trivial trade-offs and risks.
We don't need to sort them out now so going to +1 the vote.

The only ask from my side is that please put me into the PR of the central
part, I would like to review and test it.

BR,
G


On Mon, Jun 29, 2026 at 8:56 PM Aleksandr Savonin <[email protected]>
wrote:

> Hi Gabor,
> Thanks for the thoughtful review. I've updated the FLIP accordingly
> and significantly. Included the stuff that we discussed.
>
> On your summary:
>
>
> > FLIP-583 makes sinks JobID aware on V2 API (sources are already covered).
>
> It's actually the other way around: sinks are already JobID-aware
> today via Sink V2 InitContext.getJobInfo(), FLIP-583 adds it for
> sources (SourceReaderContext / SplitEnumeratorContext).
>
>
> > This means V2 API is hard requirement to per-job delegation tokens.
>
> Yes, correct, since the legacy SourceFunction/SinkFunction APIs don't
> expose JobInfo.
>
>
> > DelegationTokenProvider and DelegationTokenReceiver lifecycle remains
> as-is
>
> Correct.
>
>
> > We're planning to add some simple cooldown mechanism against aggressive
> obtain reschedule
>
> Correct: security.delegation.tokens.reobtain.cooldown (default 30s,
> configurable), plus coalescing of bursts and on-demand re-obtain can
> never push an already-scheduled renewal later.
>
>
> > Race in the DelegationTokenManager is valid concern and intended to be
> synchronised properly. Deadlock will be prevented via proper locking +
> knowing the involved threads
>
> Correct, some (to keep FLIP from growing) relevant information added to
> FLIP.
>
>
> > Starvation can happen when ioExecutor is under heavy use. This is coming
> from original design and intended to be kept as-is.
>
> Yes, I added this information in the FLIP.
>
> I've added the section with Connector Kafka reference. I also have 2
> branches for apache/flink and apache/flink-connector-kafka with e2e
> implementation and tests. I'm going to raise PRs this week (at least
> for a reference purpose).
> Regarding re-obtain in a more generic way (rather than via a
> registerJob return value): I went with exactly that. registerJob now
> returns void, and the provider receives a dedicated
> DelegationTokenManagerCallback at init(Configuration,
> DelegationTokenManagerCallback) whose single
> reobtainDelegationTokens() it can call from any thread, the manager
> coalesces and has a cooldown for those requests. So re-obtain is fully
> decoupled from registerJob.
>
> Let me know if you land on anything different after the weekend, happy
> to adjust if it suits the purpose better. I always appreciate the
> feedback, if you have anything you want to share, I will check. And
> yes, please, I'd very much welcome your help reviewing the S3
> connector migration once the Kafka reference lands. I will also wait
> for your review in the flink-connector-kafka repository, if you have
> time, indeed.
>
> On Fri, 26 Jun 2026 at 10:32, Gabor Somogyi <[email protected]>
> wrote:
> >
> > Hi Aleksandr,
> >
> > FLIP-583 makes sense. That gap filler I was looking for.
> >
> > Making the Kafka connector as reference is fine, the point is to have at
> > least one.
> > My ask would be to add in this FLIP how Kafka as reference would look
> like.
> > I'm going to take a look at it from that perspective how can we migrate
> > other connectors like S3.
> > Marking the key points are enough, like rough token structure in
> > provider/receiver,
> > how a connector stores tokens (static var or similar) and how task picks
> an
> > item during authentication.
> > Since source and sink has different context API it worth to mention token
> > selection from both side.
> >
> > Just to sum up my actual understanding (please correct me if I'm wrong):
> > - FLIP-583 makes sinks JobID aware on V2 API (sources are already
> covered).
> > This means V2 API is hard requirement to per-job delegation tokens.
> > - DelegationTokenProvider and DelegationTokenReceiver lifecycle remains
> > as-is
> > - We're planning to add some simple cooldown mechanism against aggressive
> > obtain reschedule
> > - Race in the DelegationTokenManager is valid concern and intended to be
> > synchronised properly <- This is key for me to understand in the PR
> > - Deadlock will be prevented via proper locking + knowing the involved
> > threads
> > - Starvation can happen when ioExecutor is under heavy use. This is
> coming
> > from original design and intended to be kept as-is.
> >
> > The only remaining thing what I'm thinking about is to flag re-obtain in
> a
> > more generic way and not via registerJob return value.
> > Let me think about in in the weekend, in the meantime plz update the
> FLIP,
> > it's shaping well.
> >
> > As a general saying happy to help you guys with review migrate S3
> connector.
> >
> > BR,
> > G
> >
> >
> > On Thu, Jun 25, 2026 at 7:54 PM Aleksandr Savonin <[email protected]>
> > wrote:
> >
> > > Thanks Gabor and Alan for the discussion and good points, and sorry
> > > for the late reply.
> > >
> > > On "how does a connector pick the right token": the selection key is
> > > the task's own JobID. The consuming operator already has access to it:
> > > sinks via Sink v2 InitContext.getJobInfo(), and sources via FLIP-583
> > > [1] (which exposes JobInfo on SourceReaderContext /
> > > SplitEnumeratorContext). I submitted FLIP-583 partly as a precondition
> > > for this one(though not only for that purpose).
> > >
> > > So a connector can tell "which job am I" on both sides. And there are
> > > three pieces end-to-end:
> > > 1. The task knows its JobID -> it is covered (InitContext for sinks,
> > > FLIP-583 for sources).
> > > 2. A cache on the TM side keyed by JobID, holding the per-job token,
> > > that the task looks up.
> > > 3. The connector applies that token at its authentication point.
> > >
> > > Regarding the example: I'd suggest connector-kafka if you agree. Kafka
> > > builds a client per task with its own SASL/OAUTHBEARER callback that
> > > can apply the per-job token by JobID. It's also the first connector I
> > > plan to onboard once the discussion finalizes, I've tested it
> > > internally and it works well. Could you please clarify, do you want to
> > > see more details/examples(with connectors/e2e?) in the FLIP document?
> > >
> > > I'll reply to the rest of the comments soon.
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts
> > >
> > > Kind regards,
> > > Aleksandr
> > >
> > > On Wed, 24 Jun 2026 at 13:02, Gabor Somogyi <[email protected]
> >
> > > wrote:
> > > >
> > > > Hi Alan,
> > > >
> > > > First of all thanks for the detailed explanation, it really helped
> to be
> > > in
> > > > sync!
> > > > As a general saying without too much specifics it would be good the
> solve
> > > > this
> > > > with the least invasive and most stable solution no matter what this
> > > means.
> > > > I think the starting idea is good, it would be awesome to see how
> this
> > > > end-to-end
> > > > could work and fill the gaps if there are.
> > > >
> > > > I've just refreshed my memories and created a small design
> > > > sketch (I can share if you're interested but at this point maybe not
> > > > relevant).
> > > > It's important to discuss about the API but during the design a more
> > > > fundamental
> > > > question popped into my mind.
> > > >
> > > > Let's say the receiver side there are JobID->credentials pairs. How
> any
> > > > connector in a task can
> > > > decide which one to choose when writing a specific file? We can take
> > > Hadoop
> > > > S3 connector
> > > > as an example to talk about concrete solutions.
> > > >
> > > > The reason why I'm asking is because at the end of the day we must
> do the
> > > > migration and it
> > > > would be good to have a reference connector.
> > > >
> > > > BR,
> > > > G
> > > >
> > > >
> > > > On Wed, Jun 24, 2026 at 12:04 AM Alan Sheinberg via dev <
> > > > [email protected]> wrote:
> > > >
> > > > > Hi Gabor,
> > > > >
> > > > > Thanks for the thoughtful comments. I just wanted to chime in on
> some
> > > of
> > > > > the thinking Aleksandr and I have had.
> > > > >
> > > > >  Up until now DelegationTokenProvider instances were singletons and
> > > loaded
> > > > > > by the
> > > > > > service loader. Now we plan to add stop function, does that mean
> we
> > > plan
> > > > > > to change
> > > > > > the lifecycle?
> > > > >
> > > > >
> > > > > No, the lifecycle is unchanged.  It was imagined that this would
> be a
> > > > > useful hook for potentially cleaning things up, if necessary.
> > > Sometimes
> > > > > thread pools or other resources might need to be shut down neatly.
> > > > >
> > > > > Having a generic way to ask the delegation token manager to
> re-obtain
> > > is a
> > > > > > long standing
> > > > > > needed feature but didn't have time. Having a dedicated API for
> this
> > > > > would
> > > > > > be maybe
> > > > > > better instead of relying on registerJob return value.
> > > > >
> > > > >
> > > > > I agree that a general API for managing re-obtain makes sense.
> > > Generally
> > > > > the DelegationTokenProvider would likely request a re-obtain in
> > > response to
> > > > > some event.  Currently, obtainDelegationTokens() is the main hook
> that
> > > > > fetches tokens and determines when it will be called again.
> Another
> > > > > possibility could be a background thread that requests it, or the
> new
> > > > > registerJob/unregisterJob methods being proposed.
> > > > >
> > > > > A quick sketch of a possible generic interface:
> > > > >
> > > > > public interface DelegationTokenManagerCallback {
> > > > >    void reobtainDelegationTokens();
> > > > >  }
> > > > >
> > > > > We could then overload the init method of DelegationTokenProvider
> and
> > > > > have init(Configuration
> > > > > config, DelegationTokenManagerCallback callback)so that the
> > > > > DelegationTokenProvider could keep a reference to the callback and
> > > initiate
> > > > > a re-obtain at will (causing a new refresh on
> > > > > the DefaultDelegationTokenManager's ioExecutor).  The callback
> logic
> > > would
> > > > > need to be smart about deduping calls so that only one was
> scheduled
> > > at a
> > > > > time in a threadsafe way.
> > > > >
> > > > > This method could then be utilized by the body of any registerJob,
> > > allowing
> > > > > the method to have a void return value.
> > > > >
> > > > > That approach is simple and could be extended in the future if you
> have
> > > > > some broader ideas on other parts of the api.  Would you rather
> > > implement
> > > > > this approach and avoid adding a special case to registerJob?
> > > > >
> > > > > Not sure sure how it's planned but new immediate re-obtain
> scheduling
> > > would
> > > > > > be good to be
> > > > > > upper bounded. Some retry logic can be aggressive about
> > > re-registration.
> > > > > > Or having a
> > > > > > cooldown is also fine.
> > > > >
> > > > >
> > > > > That makes sense to have some cool down to avoid doing it too
> often,
> > > > > however, a job might not run if it cannot initiate a re-obtain soon
> > > after
> > > > > being registered.  A configurable cooldown with a decent default
> might
> > > be a
> > > > > good choice.
> > > > >
> > > > > Last but not least up until now there was a single thread which
> played
> > > on
> > > > > > critical path on
> > > > > > immutable structures. Now we plan to change that which is fine
> but
> > > then I
> > > > > > would like to see an
> > > > > > exact plan what kind of threads are doing what and how do we
> protect
> > > > > > against
> > > > > > race/starvation/deadlock. Having an exact look is fine on the PR
> but
> > > this
> > > > > > is the gist of it
> > > > > > from my perspective.
> > > > > >
> > > > >
> > > > > In the current codebase a single thread creates
> > > > > the DefaultDelegationTokenManager and builds the immutable
> structures.
> > > Then
> > > > > DefaultDelegationTokenManager.start is called from the
> ResourceManager
> > > main
> > > > > thread and each token re-obtain is called on a thread
> > > > > from DefaultDelegationTokenManager.ioExecutor.  Therefore, fields
> > > within a
> > > > > DelegationTokenProvider must either be immutable or properly
> > > synchronized.
> > > > >
> > > > > The calls to registerJob/unregisterJob in this FLIP will come from
> the
> > > > > ResourceManager main thread, calling through to
> > > > > DefaultDelegationTokenManager and then the providers.  They are
> > > assumed to
> > > > > be non blocking and just handle book-keeping for the next re-obtain
> > > call.
> > > > > Since this pattern inherently requires updating internal fields,
> the
> > > > > DelegationTokenProvider must properly synchronize the
> methods/fields
> > > used
> > > > > for this book-keeping.  Calls to registerJob/unregisterJob aren't
> > > prevented
> > > > > from blocking and starving others, similar to
> obtainDelegationTokens.
> > > The
> > > > > contract can be made very clear in the javadoc.  Preventing races,
> > > > > starvation, or deadlock within the provider will therefore depend
> on
> > > proper
> > > > > implementation by the user.
> > > > >
> > > > > A larger reworking of DefaultDelegationTokenManager could try to do
> > > > > everything on a single thread
> > > > > (registerJob/unregisterJob/obtainDelegationTokens) to simplify this
> > > model,
> > > > > but would require using a special background thread rather than the
> > > > > ioExecutor.  I haven't considered this in detail, but would be
> open to
> > > it
> > > > > if it were strongly preferred.
> > > > >
> > > > > What I mean here specifically is that even if we schedule the
> renewal
> > > the
> > > > > > existing way
> > > > > > at least the providers list manipulation and the originally
> scheduled
> > > > > > renewal can race.
> > > > > > Maybe others since I can just imagine the change.
> > > > >
> > > > >
> > > > > I don't think we intend on changing the list of providers -- these
> are
> > > > > still immutable.   Whenever a new re-obtain is requested, it should
> > > cancel
> > > > > the originally scheduled renewal using the future as in
> > > > > DefaultDelegationTokenManager.stopTokensUpdate, ensuring just one
> > > update
> > > > > scheduled at a time.
> > > > >
> > > > > I hope I have answered a lot of your questions.  I'm happy to
> > > elaborate or
> > > > > even show a draft PR if that might be easier to trace.
> > > > >
> > > > > Thanks,
> > > > > Alan
> > > > >
> > > > > On Fri, Jun 19, 2026 at 7:50 AM Gabor Somogyi <
> > > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Hi Aleksandr,
> > > > > >
> > > > > > Thanks for efforts!
> > > > > >
> > > > > > I've missed this thread lately but have some thought/questions.
> > > > > >
> > > > > > Up until now one cluster per one set of user credentials was the
> > > model. I
> > > > > > think the multi-user
> > > > > > model better serves the needs so +1. We should mention this on
> the
> > > main
> > > > > > doc page later.
> > > > > >
> > > > > > Up until now DelegationTokenProvider instances were singletons
> and
> > > loaded
> > > > > > by the
> > > > > > service loader. Now we plan to add stop function, does that mean
> we
> > > plan
> > > > > > to change
> > > > > > the lifecycle?
> > > > > >
> > > > > > Having a generic way to ask the delegation token manager to
> > > re-obtain is
> > > > > a
> > > > > > long standing
> > > > > > needed feature but didn't have time. Having a dedicated API for
> this
> > > > > would
> > > > > > be maybe
> > > > > > better instead of relying on registerJob return value.
> > > > > >
> > > > > > Not sure sure how it's planned but new immediate re-obtain
> scheduling
> > > > > > would be good to be
> > > > > > upper bounded. Some retry logic can be aggressive about
> > > re-registration.
> > > > > > Or having a
> > > > > > cooldown is also fine.
> > > > > >
> > > > > > Last but not least up until now there was a single thread which
> > > played on
> > > > > > critical path on
> > > > > > immutable structures. Now we plan to change that which is fine
> but
> > > then I
> > > > > > would like to see an
> > > > > > exact plan what kind of threads are doing what and how do we
> protect
> > > > > > against
> > > > > > race/starvation/deadlock. Having an exact look is fine on the PR
> but
> > > this
> > > > > > is the gist of it
> > > > > > from my perspective.
> > > > > > What I mean here specifically is that even if we schedule the
> > > renewal the
> > > > > > existing way
> > > > > > at least the providers list manipulation and the originally
> scheduled
> > > > > > renewal can race.
> > > > > > Maybe others since I can just imagine the change.
> > > > > >
> > > > > > BR,
> > > > > > G
> > > > > >
> > > > > >
> > > > > > On 2026/06/05 16:35:15 Aleksandr Savonin wrote:
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Alan Sheinberg and I would like to start a discussion on
> FLIP-588:
> > > > > > > Support per-job delegation tokens [1].
> > > > > > > Flink's delegation token framework is currently cluster-scoped,
> > > which
> > > > > > > means a DelegationTokenProvider has no notion of an individual
> job.
> > > > > > > This breaks when different jobs on the same cluster need to
> > > > > > > authenticate as different identities to the same external
> service.
> > > > > > > To resolve this, the FLIP adds per-job lifecycle hooks
> > > > > > > (registerJob/unregisterJob/stop) as default methods on the
> > > > > > > DelegationTokenProvider SPI, along with the runtime wiring to
> > > invoke
> > > > > > > them on job start and stop.
> > > > > > > This change is fully backward compatible (new methods are
> default
> > > > > > > no-ops). It is worth mentioning that it widens the internal
> > > > > > > registerJobMaster RPC to carry the job configuration.
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > >
> > > > > > > [1]
> > > > > >
> > > > >
> > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-588*3A*Support*per-job*delegation*tokens__;JSsrKys!!Ayb5sqE7!pujTCGQDxHRMUp32hJP7kWS_heNDLb_73xOFQWmfwladcejJ1XJF028lAWmhEubAIfREamAXhXe0ImcLzn1TBQ9SvZl-ww$
> > > > > > >
> > > > > > > --
> > > > > > > Kind regards,
> > > > > > > Aleksandr
> > > > > > >
> > > > > >
> > > > >
> > >
> > >
> > >
> > > --
> > > Kind regards,
> > > Aleksandr
> > >
>
>
>
> --
> Kind regards,
> Aleksandr
>

Reply via email to