Hi Aleksandr, Thanks for the adjustments, I've had a deep look at it.
The FLIP looks good, a tiny suggestion is that DelegationTokenManagerCallback can live inside DelegationTokenManager just like the Listener. The design looks good from my side to implement it together with the Kafka part. As a side note now I see why Kafka as reference is suggested (apart from that's the development expectation) :) Applying the new Job aware token framework to filesystems contains some non-trivial trade-offs and risks. We don't need to sort them out now so going to +1 the vote. The only ask from my side is that please put me into the PR of the central part, I would like to review and test it. BR, G On Mon, Jun 29, 2026 at 8:56 PM Aleksandr Savonin <[email protected]> wrote: > Hi Gabor, > Thanks for the thoughtful review. I've updated the FLIP accordingly > and significantly. Included the stuff that we discussed. > > On your summary: > > > > FLIP-583 makes sinks JobID aware on V2 API (sources are already covered). > > It's actually the other way around: sinks are already JobID-aware > today via Sink V2 InitContext.getJobInfo(), FLIP-583 adds it for > sources (SourceReaderContext / SplitEnumeratorContext). > > > > This means V2 API is hard requirement to per-job delegation tokens. > > Yes, correct, since the legacy SourceFunction/SinkFunction APIs don't > expose JobInfo. > > > > DelegationTokenProvider and DelegationTokenReceiver lifecycle remains > as-is > > Correct. > > > > We're planning to add some simple cooldown mechanism against aggressive > obtain reschedule > > Correct: security.delegation.tokens.reobtain.cooldown (default 30s, > configurable), plus coalescing of bursts and on-demand re-obtain can > never push an already-scheduled renewal later. > > > > Race in the DelegationTokenManager is valid concern and intended to be > synchronised properly. Deadlock will be prevented via proper locking + > knowing the involved threads > > Correct, some (to keep FLIP from growing) relevant information added to > FLIP. > > > > Starvation can happen when ioExecutor is under heavy use. This is coming > from original design and intended to be kept as-is. > > Yes, I added this information in the FLIP. > > I've added the section with Connector Kafka reference. I also have 2 > branches for apache/flink and apache/flink-connector-kafka with e2e > implementation and tests. I'm going to raise PRs this week (at least > for a reference purpose). > Regarding re-obtain in a more generic way (rather than via a > registerJob return value): I went with exactly that. registerJob now > returns void, and the provider receives a dedicated > DelegationTokenManagerCallback at init(Configuration, > DelegationTokenManagerCallback) whose single > reobtainDelegationTokens() it can call from any thread, the manager > coalesces and has a cooldown for those requests. So re-obtain is fully > decoupled from registerJob. > > Let me know if you land on anything different after the weekend, happy > to adjust if it suits the purpose better. I always appreciate the > feedback, if you have anything you want to share, I will check. And > yes, please, I'd very much welcome your help reviewing the S3 > connector migration once the Kafka reference lands. I will also wait > for your review in the flink-connector-kafka repository, if you have > time, indeed. > > On Fri, 26 Jun 2026 at 10:32, Gabor Somogyi <[email protected]> > wrote: > > > > Hi Aleksandr, > > > > FLIP-583 makes sense. That gap filler I was looking for. > > > > Making the Kafka connector as reference is fine, the point is to have at > > least one. > > My ask would be to add in this FLIP how Kafka as reference would look > like. > > I'm going to take a look at it from that perspective how can we migrate > > other connectors like S3. > > Marking the key points are enough, like rough token structure in > > provider/receiver, > > how a connector stores tokens (static var or similar) and how task picks > an > > item during authentication. > > Since source and sink has different context API it worth to mention token > > selection from both side. > > > > Just to sum up my actual understanding (please correct me if I'm wrong): > > - FLIP-583 makes sinks JobID aware on V2 API (sources are already > covered). > > This means V2 API is hard requirement to per-job delegation tokens. > > - DelegationTokenProvider and DelegationTokenReceiver lifecycle remains > > as-is > > - We're planning to add some simple cooldown mechanism against aggressive > > obtain reschedule > > - Race in the DelegationTokenManager is valid concern and intended to be > > synchronised properly <- This is key for me to understand in the PR > > - Deadlock will be prevented via proper locking + knowing the involved > > threads > > - Starvation can happen when ioExecutor is under heavy use. This is > coming > > from original design and intended to be kept as-is. > > > > The only remaining thing what I'm thinking about is to flag re-obtain in > a > > more generic way and not via registerJob return value. > > Let me think about in in the weekend, in the meantime plz update the > FLIP, > > it's shaping well. > > > > As a general saying happy to help you guys with review migrate S3 > connector. > > > > BR, > > G > > > > > > On Thu, Jun 25, 2026 at 7:54 PM Aleksandr Savonin <[email protected]> > > wrote: > > > > > Thanks Gabor and Alan for the discussion and good points, and sorry > > > for the late reply. > > > > > > On "how does a connector pick the right token": the selection key is > > > the task's own JobID. The consuming operator already has access to it: > > > sinks via Sink v2 InitContext.getJobInfo(), and sources via FLIP-583 > > > [1] (which exposes JobInfo on SourceReaderContext / > > > SplitEnumeratorContext). I submitted FLIP-583 partly as a precondition > > > for this one(though not only for that purpose). > > > > > > So a connector can tell "which job am I" on both sides. And there are > > > three pieces end-to-end: > > > 1. The task knows its JobID -> it is covered (InitContext for sinks, > > > FLIP-583 for sources). > > > 2. A cache on the TM side keyed by JobID, holding the per-job token, > > > that the task looks up. > > > 3. The connector applies that token at its authentication point. > > > > > > Regarding the example: I'd suggest connector-kafka if you agree. Kafka > > > builds a client per task with its own SASL/OAUTHBEARER callback that > > > can apply the per-job token by JobID. It's also the first connector I > > > plan to onboard once the discussion finalizes, I've tested it > > > internally and it works well. Could you please clarify, do you want to > > > see more details/examples(with connectors/e2e?) in the FLIP document? > > > > > > I'll reply to the rest of the comments soon. > > > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts > > > > > > Kind regards, > > > Aleksandr > > > > > > On Wed, 24 Jun 2026 at 13:02, Gabor Somogyi <[email protected] > > > > > wrote: > > > > > > > > Hi Alan, > > > > > > > > First of all thanks for the detailed explanation, it really helped > to be > > > in > > > > sync! > > > > As a general saying without too much specifics it would be good the > solve > > > > this > > > > with the least invasive and most stable solution no matter what this > > > means. > > > > I think the starting idea is good, it would be awesome to see how > this > > > > end-to-end > > > > could work and fill the gaps if there are. > > > > > > > > I've just refreshed my memories and created a small design > > > > sketch (I can share if you're interested but at this point maybe not > > > > relevant). > > > > It's important to discuss about the API but during the design a more > > > > fundamental > > > > question popped into my mind. > > > > > > > > Let's say the receiver side there are JobID->credentials pairs. How > any > > > > connector in a task can > > > > decide which one to choose when writing a specific file? We can take > > > Hadoop > > > > S3 connector > > > > as an example to talk about concrete solutions. > > > > > > > > The reason why I'm asking is because at the end of the day we must > do the > > > > migration and it > > > > would be good to have a reference connector. > > > > > > > > BR, > > > > G > > > > > > > > > > > > On Wed, Jun 24, 2026 at 12:04 AM Alan Sheinberg via dev < > > > > [email protected]> wrote: > > > > > > > > > Hi Gabor, > > > > > > > > > > Thanks for the thoughtful comments. I just wanted to chime in on > some > > > of > > > > > the thinking Aleksandr and I have had. > > > > > > > > > > Up until now DelegationTokenProvider instances were singletons and > > > loaded > > > > > > by the > > > > > > service loader. Now we plan to add stop function, does that mean > we > > > plan > > > > > > to change > > > > > > the lifecycle? > > > > > > > > > > > > > > > No, the lifecycle is unchanged. It was imagined that this would > be a > > > > > useful hook for potentially cleaning things up, if necessary. > > > Sometimes > > > > > thread pools or other resources might need to be shut down neatly. > > > > > > > > > > Having a generic way to ask the delegation token manager to > re-obtain > > > is a > > > > > > long standing > > > > > > needed feature but didn't have time. Having a dedicated API for > this > > > > > would > > > > > > be maybe > > > > > > better instead of relying on registerJob return value. > > > > > > > > > > > > > > > I agree that a general API for managing re-obtain makes sense. > > > Generally > > > > > the DelegationTokenProvider would likely request a re-obtain in > > > response to > > > > > some event. Currently, obtainDelegationTokens() is the main hook > that > > > > > fetches tokens and determines when it will be called again. > Another > > > > > possibility could be a background thread that requests it, or the > new > > > > > registerJob/unregisterJob methods being proposed. > > > > > > > > > > A quick sketch of a possible generic interface: > > > > > > > > > > public interface DelegationTokenManagerCallback { > > > > > void reobtainDelegationTokens(); > > > > > } > > > > > > > > > > We could then overload the init method of DelegationTokenProvider > and > > > > > have init(Configuration > > > > > config, DelegationTokenManagerCallback callback)so that the > > > > > DelegationTokenProvider could keep a reference to the callback and > > > initiate > > > > > a re-obtain at will (causing a new refresh on > > > > > the DefaultDelegationTokenManager's ioExecutor). The callback > logic > > > would > > > > > need to be smart about deduping calls so that only one was > scheduled > > > at a > > > > > time in a threadsafe way. > > > > > > > > > > This method could then be utilized by the body of any registerJob, > > > allowing > > > > > the method to have a void return value. > > > > > > > > > > That approach is simple and could be extended in the future if you > have > > > > > some broader ideas on other parts of the api. Would you rather > > > implement > > > > > this approach and avoid adding a special case to registerJob? > > > > > > > > > > Not sure sure how it's planned but new immediate re-obtain > scheduling > > > would > > > > > > be good to be > > > > > > upper bounded. Some retry logic can be aggressive about > > > re-registration. > > > > > > Or having a > > > > > > cooldown is also fine. > > > > > > > > > > > > > > > That makes sense to have some cool down to avoid doing it too > often, > > > > > however, a job might not run if it cannot initiate a re-obtain soon > > > after > > > > > being registered. A configurable cooldown with a decent default > might > > > be a > > > > > good choice. > > > > > > > > > > Last but not least up until now there was a single thread which > played > > > on > > > > > > critical path on > > > > > > immutable structures. Now we plan to change that which is fine > but > > > then I > > > > > > would like to see an > > > > > > exact plan what kind of threads are doing what and how do we > protect > > > > > > against > > > > > > race/starvation/deadlock. Having an exact look is fine on the PR > but > > > this > > > > > > is the gist of it > > > > > > from my perspective. > > > > > > > > > > > > > > > > In the current codebase a single thread creates > > > > > the DefaultDelegationTokenManager and builds the immutable > structures. > > > Then > > > > > DefaultDelegationTokenManager.start is called from the > ResourceManager > > > main > > > > > thread and each token re-obtain is called on a thread > > > > > from DefaultDelegationTokenManager.ioExecutor. Therefore, fields > > > within a > > > > > DelegationTokenProvider must either be immutable or properly > > > synchronized. > > > > > > > > > > The calls to registerJob/unregisterJob in this FLIP will come from > the > > > > > ResourceManager main thread, calling through to > > > > > DefaultDelegationTokenManager and then the providers. They are > > > assumed to > > > > > be non blocking and just handle book-keeping for the next re-obtain > > > call. > > > > > Since this pattern inherently requires updating internal fields, > the > > > > > DelegationTokenProvider must properly synchronize the > methods/fields > > > used > > > > > for this book-keeping. Calls to registerJob/unregisterJob aren't > > > prevented > > > > > from blocking and starving others, similar to > obtainDelegationTokens. > > > The > > > > > contract can be made very clear in the javadoc. Preventing races, > > > > > starvation, or deadlock within the provider will therefore depend > on > > > proper > > > > > implementation by the user. > > > > > > > > > > A larger reworking of DefaultDelegationTokenManager could try to do > > > > > everything on a single thread > > > > > (registerJob/unregisterJob/obtainDelegationTokens) to simplify this > > > model, > > > > > but would require using a special background thread rather than the > > > > > ioExecutor. I haven't considered this in detail, but would be > open to > > > it > > > > > if it were strongly preferred. > > > > > > > > > > What I mean here specifically is that even if we schedule the > renewal > > > the > > > > > > existing way > > > > > > at least the providers list manipulation and the originally > scheduled > > > > > > renewal can race. > > > > > > Maybe others since I can just imagine the change. > > > > > > > > > > > > > > > I don't think we intend on changing the list of providers -- these > are > > > > > still immutable. Whenever a new re-obtain is requested, it should > > > cancel > > > > > the originally scheduled renewal using the future as in > > > > > DefaultDelegationTokenManager.stopTokensUpdate, ensuring just one > > > update > > > > > scheduled at a time. > > > > > > > > > > I hope I have answered a lot of your questions. I'm happy to > > > elaborate or > > > > > even show a draft PR if that might be easier to trace. > > > > > > > > > > Thanks, > > > > > Alan > > > > > > > > > > On Fri, Jun 19, 2026 at 7:50 AM Gabor Somogyi < > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Hi Aleksandr, > > > > > > > > > > > > Thanks for efforts! > > > > > > > > > > > > I've missed this thread lately but have some thought/questions. > > > > > > > > > > > > Up until now one cluster per one set of user credentials was the > > > model. I > > > > > > think the multi-user > > > > > > model better serves the needs so +1. We should mention this on > the > > > main > > > > > > doc page later. > > > > > > > > > > > > Up until now DelegationTokenProvider instances were singletons > and > > > loaded > > > > > > by the > > > > > > service loader. Now we plan to add stop function, does that mean > we > > > plan > > > > > > to change > > > > > > the lifecycle? > > > > > > > > > > > > Having a generic way to ask the delegation token manager to > > > re-obtain is > > > > > a > > > > > > long standing > > > > > > needed feature but didn't have time. Having a dedicated API for > this > > > > > would > > > > > > be maybe > > > > > > better instead of relying on registerJob return value. > > > > > > > > > > > > Not sure sure how it's planned but new immediate re-obtain > scheduling > > > > > > would be good to be > > > > > > upper bounded. Some retry logic can be aggressive about > > > re-registration. > > > > > > Or having a > > > > > > cooldown is also fine. > > > > > > > > > > > > Last but not least up until now there was a single thread which > > > played on > > > > > > critical path on > > > > > > immutable structures. Now we plan to change that which is fine > but > > > then I > > > > > > would like to see an > > > > > > exact plan what kind of threads are doing what and how do we > protect > > > > > > against > > > > > > race/starvation/deadlock. Having an exact look is fine on the PR > but > > > this > > > > > > is the gist of it > > > > > > from my perspective. > > > > > > What I mean here specifically is that even if we schedule the > > > renewal the > > > > > > existing way > > > > > > at least the providers list manipulation and the originally > scheduled > > > > > > renewal can race. > > > > > > Maybe others since I can just imagine the change. > > > > > > > > > > > > BR, > > > > > > G > > > > > > > > > > > > > > > > > > On 2026/06/05 16:35:15 Aleksandr Savonin wrote: > > > > > > > Hi everyone, > > > > > > > > > > > > > > Alan Sheinberg and I would like to start a discussion on > FLIP-588: > > > > > > > Support per-job delegation tokens [1]. > > > > > > > Flink's delegation token framework is currently cluster-scoped, > > > which > > > > > > > means a DelegationTokenProvider has no notion of an individual > job. > > > > > > > This breaks when different jobs on the same cluster need to > > > > > > > authenticate as different identities to the same external > service. > > > > > > > To resolve this, the FLIP adds per-job lifecycle hooks > > > > > > > (registerJob/unregisterJob/stop) as default methods on the > > > > > > > DelegationTokenProvider SPI, along with the runtime wiring to > > > invoke > > > > > > > them on job start and stop. > > > > > > > This change is fully backward compatible (new methods are > default > > > > > > > no-ops). It is worth mentioning that it widens the internal > > > > > > > registerJobMaster RPC to carry the job configuration. > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-588*3A*Support*per-job*delegation*tokens__;JSsrKys!!Ayb5sqE7!pujTCGQDxHRMUp32hJP7kWS_heNDLb_73xOFQWmfwladcejJ1XJF028lAWmhEubAIfREamAXhXe0ImcLzn1TBQ9SvZl-ww$ > > > > > > > > > > > > > > -- > > > > > > > Kind regards, > > > > > > > Aleksandr > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Kind regards, > > > Aleksandr > > > > > > > -- > Kind regards, > Aleksandr >
