Hi Robert, That would be awesome.
My cwiki username: gaborgsomogyi G On Fri, Jan 28, 2022 at 5:06 PM Robert Metzger <metrob...@gmail.com> wrote: > Hey Gabor, > > let me know your cwiki username, and I can give you write permissions. > > > On Fri, Jan 28, 2022 at 4:05 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > > > Thanks for making the design better! No further thing to discuss from my > > side. > > > > Started to reflect the agreement in the FLIP doc. > > Since I don't have access to the wiki I need to ask Marci to do that > which > > may take some time. > > > > G > > > > > > On Fri, Jan 28, 2022 at 3:52 PM David Morávek <d...@apache.org> wrote: > > > > > Hi, > > > > > > AFAIU an under registration TM is not added to the registered TMs map > > until > > > > RegistrationResponse .. > > > > > > > > > > I think you're right, with a careful design around threading > (delegating > > > update broadcasts to the main thread) + synchronous initial update > (that > > > would be nice to avoid) this should be doable. > > > > > > Not sure what you mean "we can't register the TM without providing it > > with > > > > token" but in unsecure configuration registration must happen w/o > > tokens. > > > > > > > > > > Exactly as you describe it, this was meant only for the "kerberized / > > > secured" cluster case, in other cases we wouldn't enforce a non-null > > token > > > in the response > > > > > > I think this is a good idea in general. > > > > > > > > > > +1 > > > > > > If you don't have any more thoughts on the RPC / lifecycle part, can > you > > > please reflect it into the FLIP? > > > > > > D. > > > > > > On Fri, Jan 28, 2022 at 3:16 PM Gabor Somogyi < > gabor.g.somo...@gmail.com > > > > > > wrote: > > > > > > > > - Make sure DTs issued by single DTMs are monotonically increasing > > (can > > > > be > > > > sorted on TM side) > > > > > > > > AFAIU an under registration TM is not added to the registered TMs map > > > until > > > > RegistrationResponse > > > > is processed which would contain the initial tokens. If that's true > > then > > > > how is it possible to have race with > > > > DTM update which is working on the registered TMs list? > > > > To be more specific "taskExecutors" is the registered map of TMs to > > which > > > > DTM can send updated tokens > > > > but this doesn't contain the under registration TM while > > > > RegistrationResponse is not processed, right? > > > > > > > > Of course if DTM can update while RegistrationResponse is processed > > then > > > > somehow sorting would be > > > > required and that case I would agree. > > > > > > > > - Scope DT updates by the RM ID and ensure that TM only accepts > update > > > from > > > > the current leader > > > > > > > > I've planned this initially the mentioned way so agreed. > > > > > > > > - Return initial token with the RegistrationResponse, which should > make > > > the > > > > RPC contract bit clearer (ensure that we can't register the TM > without > > > > providing it with token) > > > > > > > > I think this is a good idea in general. Not sure what you mean "we > > can't > > > > register the TM without > > > > providing it with token" but in unsecure configuration registration > > must > > > > happen w/o tokens. > > > > All in all the newly added tokens field must be somehow optional. > > > > > > > > G > > > > > > > > > > > > On Fri, Jan 28, 2022 at 2:22 PM David Morávek <d...@apache.org> > wrote: > > > > > > > > > We had a long discussion with Chesnay about the possible edge cases > > and > > > > it > > > > > basically boils down to the following two scenarios: > > > > > > > > > > 1) There is a possible race condition between TM registration (the > > > first > > > > DT > > > > > update) and token refresh if they happen simultaneously. Than the > > > > > registration might beat the refreshed token. This could be easily > > > > addressed > > > > > if DTs could be sorted (eg. by the expiration time) on the TM side. > > In > > > > > other words, if there are multiple updates at the same time we need > > to > > > > make > > > > > sure that we have a deterministic way of choosing the latest one. > > > > > > > > > > One idea by Chesnay that popped up during this discussion was > whether > > > we > > > > > could simply return the initial token with the RegistrationResponse > > to > > > > > avoid making an extra call during the TM registration. > > > > > > > > > > 2) When the RM leadership changes (eg. because zookeeper session > > times > > > > out) > > > > > there might be a race condition where the old RM is shutting down > and > > > > > updates the tokens, that it might again beat the registration token > > of > > > > the > > > > > new RM. This could be avoided if we scope the token by > > > > _ResourceManagerId_ > > > > > and only accept updates for the current leader (basically we'd have > > an > > > > > extra parameter to the _updateDelegationToken_ method). > > > > > > > > > > - > > > > > > > > > > DTM is way simpler then for example slot management, which could > > > receive > > > > > updates from the JobMaster that RM might not know about. > > > > > > > > > > So if you want to go in the path you're describing it should be > > doable > > > > and > > > > > we'd propose following to cover all cases: > > > > > > > > > > - Make sure DTs issued by single DTMs are monotonically increasing > > (can > > > > be > > > > > sorted on TM side) > > > > > - Scope DT updates by the RM ID and ensure that TM only accepts > > update > > > > from > > > > > the current leader > > > > > - Return initial token with the RegistrationResponse, which should > > make > > > > the > > > > > RPC contract bit clearer (ensure that we can't register the TM > > without > > > > > providing it with token) > > > > > > > > > > Any thoughts? > > > > > > > > > > > > > > > On Fri, Jan 28, 2022 at 10:53 AM Gabor Somogyi < > > > > gabor.g.somo...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thanks for investing your time! > > > > > > > > > > > > The first 2 bulletpoint are clear. > > > > > > If there is a chance that a TM can go to an inconsistent state > > then I > > > > > agree > > > > > > with the 3rd bulletpoint. > > > > > > Just before we agree on that I would like to learn something new > > and > > > > > > understand how is it possible that a TM > > > > > > gets corrupted? (In Spark I've never seen such thing and no > > mechanism > > > > to > > > > > > fix this but Flink is definitely not Spark) > > > > > > > > > > > > Here is my understanding: > > > > > > * DTM pushes new obtained DTs to TMs and if any exception occurs > > > then a > > > > > > retry after "security.kerberos.tokens.retry-wait" > > > > > > happens. This means DTM retries until it's not possible to send > new > > > DTs > > > > > to > > > > > > all registered TMs. > > > > > > * New TM registration must fail if "updateDelegationToken" fails > > > > > > * "updateDelegationToken" fails consistently like a DB (at least > I > > > plan > > > > > to > > > > > > implement it that way). > > > > > > If DTs are arriving on the TM side then a single > > > > > > "UserGroupInformation.getCurrentUser.addCredentials" > > > > > > will be called which I've never seen it failed. > > > > > > * I hope all other code parts are not touching existing DTs > within > > > the > > > > > JVM > > > > > > > > > > > > I would like to emphasize I'm not against to add it just want to > > see > > > > what > > > > > > kind of problems are we facing. > > > > > > It would ease to catch bugs earlier and help in the maintenance. > > > > > > > > > > > > All in all I would buy the idea to add the 3rd bullet if we > foresee > > > the > > > > > > need. > > > > > > > > > > > > G > > > > > > > > > > > > > > > > > > On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org> > > > > wrote: > > > > > > > > > > > > > Hi Gabor, > > > > > > > > > > > > > > This is definitely headed in a right direction +1. > > > > > > > > > > > > > > I think we still need to have a safeguard in case some of the > TMs > > > > gets > > > > > > into > > > > > > > the inconsistent state though, which will also eliminate the > need > > > for > > > > > > > implementing a custom retry mechanism (when > > _updateDelegationToken_ > > > > > call > > > > > > > fails for some reason). > > > > > > > > > > > > > > We already have this safeguard in place for slot pool (in case > > > there > > > > > are > > > > > > > some slots in inconsistent state - eg. we haven't freed them > for > > > some > > > > > > > reason) and for the partition tracker, which could be simply > > > > enhanced. > > > > > > This > > > > > > > is done via periodic heartbeat from TaskManagers to the > > > > ResourceManager > > > > > > > that contains report about state of these two components (from > TM > > > > > > > perspective) so the RM can reconcile their state if necessary. > > > > > > > > > > > > > > I don't think adding an additional field to > > > > > > _TaskExecutorHeartbeatPayload_ > > > > > > > should be a concern as we only heartbeat every ~ 10s by default > > and > > > > the > > > > > > new > > > > > > > field would be small compared to rest of the existing payload. > > Also > > > > > > > heartbeat doesn't need to contain the whole DT, but just some > > > > > identifier > > > > > > > which signals whether it uses the right one, that could be > > > > > significantly > > > > > > > smaller. > > > > > > > > > > > > > > This is still a PUSH based approach as the RM would again call > > the > > > > > newly > > > > > > > introduced _updateDelegationToken_ when it encounters > > inconsistency > > > > > (eg. > > > > > > > due to a temporary network partition / a race condition we > didn't > > > > test > > > > > > for > > > > > > > / some other scenario we didn't think about). In practice these > > > > > > > inconsistencies are super hard to avoid and reason about (and > > > > > > unfortunately > > > > > > > yes, we see them happen from time to time), so reusing the > > existing > > > > > > > mechanism that is designed for this exact problem simplify > > things. > > > > > > > > > > > > > > To sum this up we'd have three code paths for calling > > > > > > > _updateDelegationToken_: > > > > > > > 1) When the TM registers, we push the token (if DTM already has > > it) > > > > to > > > > > it > > > > > > > 2) When DTM obtains a new token it broadcasts it to all > currently > > > > > > connected > > > > > > > TMs > > > > > > > 3) When a TM gets out of sync, DTM would reconcile it's state > > > > > > > > > > > > > > WDYT? > > > > > > > > > > > > > > Best, > > > > > > > D. > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 26, 2022 at 9:03 PM David Morávek <d...@apache.org > > > > > > wrote: > > > > > > > > > > > > > > > Thanks the update, I'll go over it tomorrow. > > > > > > > > > > > > > > > > On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi < > > > > > > gabor.g.somo...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hi All, > > > > > > > >> > > > > > > > >> Since it has turned out that DTM can't be added as member of > > > > > JobMaster > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 > > > > > > > >> > > > > > > > > >> I've > > > > > > > >> came up with a better proposal. > > > > > > > >> David, thanks for pinpointing this out, you've caught a bug > in > > > the > > > > > > early > > > > > > > >> phase! > > > > > > > >> > > > > > > > >> Namely ResourceManager > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124 > > > > > > > >> > > > > > > > > >> is > > > > > > > >> a single instance class where DTM can be added as member > > > variable. > > > > > > > >> It has a list of all already registered TMs and new TM > > > > registration > > > > > is > > > > > > > >> also > > > > > > > >> happening here. > > > > > > > >> The following can be added from logic perspective to be more > > > > > specific: > > > > > > > >> * Create new DTM instance in ResourceManager > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124 > > > > > > > >> > > > > > > > > >> and > > > > > > > >> start it (re-occurring thread to obtain new tokens) > > > > > > > >> * Add a new function named "updateDelegationTokens" to > > > > > > > TaskExecutorGateway > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54 > > > > > > > >> > > > > > > > > >> * Call "updateDelegationTokens" on all registered TMs to > > > propagate > > > > > new > > > > > > > DTs > > > > > > > >> * In case of new TM registration call > "updateDelegationTokens" > > > > > before > > > > > > > >> registration succeeds to setup new TM properly > > > > > > > >> > > > > > > > >> This way: > > > > > > > >> * only a single DTM would live within a cluster which is the > > > > > expected > > > > > > > >> behavior > > > > > > > >> * DTM is going to be added to a central place where all > > > deployment > > > > > > > target > > > > > > > >> can make use of it > > > > > > > >> * DTs are going to be pushed to TMs which would generate > less > > > > > network > > > > > > > >> traffic than pull based approach > > > > > > > >> (please see my previous mail where I've described both > > > approaches) > > > > > > > >> * HA scenario is going to be consistent because such > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069 > > > > > > > >> > > > > > > > > >> a solution can be added to "updateDelegationTokens" > > > > > > > >> > > > > > > > >> @David or all others plz share whether you agree on this or > > you > > > > have > > > > > > > >> better > > > > > > > >> idea/suggestion. > > > > > > > >> > > > > > > > >> BR, > > > > > > > >> G > > > > > > > >> > > > > > > > >> > > > > > > > >> On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi < > > > > > > > gabor.g.somo...@gmail.com > > > > > > > >> > > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >> > First of all thanks for investing your time and helping me > > > out. > > > > > As I > > > > > > > see > > > > > > > >> > you have pretty solid knowledge in the RPC area. > > > > > > > >> > I would like to rely on your knowledge since I'm learning > > this > > > > > part. > > > > > > > >> > > > > > > > > >> > > - Do we need to introduce a new RPC method or can we for > > > > example > > > > > > > >> > piggyback > > > > > > > >> > on heartbeats? > > > > > > > >> > > > > > > > > >> > I'm fine with either solution but one thing is important > > > > > > conceptually. > > > > > > > >> > There are fundamentally 2 ways how tokens can be updated: > > > > > > > >> > - Push way: When there are new DTs then JM JVM pushes DTs > to > > > TM > > > > > > JVMs. > > > > > > > >> This > > > > > > > >> > is the preferred one since tiny amount of control logic > > > needed. > > > > > > > >> > - Pull way: Each time a TM would like to poll JM whether > > there > > > > are > > > > > > new > > > > > > > >> > tokens and each TM wants to decide alone whether DTs needs > > to > > > be > > > > > > > >> updated or > > > > > > > >> > not. > > > > > > > >> > As you've mentioned here some ID needs to be generated, it > > > would > > > > > > > >> generated > > > > > > > >> > quite some additional network traffic which can be > > definitely > > > > > > avoided. > > > > > > > >> > As a final thought in Spark we've had this way of DT > > > propagation > > > > > > logic > > > > > > > >> and > > > > > > > >> > we've had major issues with it. > > > > > > > >> > > > > > > > > >> > So all in all DTM needs to obtain new tokens and there > must > > a > > > > way > > > > > to > > > > > > > >> send > > > > > > > >> > this data to all TMs from JM. > > > > > > > >> > > > > > > > > >> > > - What delivery semantics are we looking for? (what if > > we're > > > > > only > > > > > > > >> able to > > > > > > > >> > update subset of TMs / what happens if we exhaust retries > / > > > > should > > > > > > we > > > > > > > >> even > > > > > > > >> > have the retry mechanism whatsoever) - I have a feeling > that > > > > > somehow > > > > > > > >> > leveraging the existing heartbeat mechanism could help to > > > answer > > > > > > these > > > > > > > >> > questions > > > > > > > >> > > > > > > > > >> > Let's go through these questions one by one. > > > > > > > >> > > What delivery semantics are we looking for? > > > > > > > >> > > > > > > > > >> > DTM must receive an exception when at least one TM was not > > > able > > > > to > > > > > > get > > > > > > > >> DTs. > > > > > > > >> > > > > > > > > >> > > what if we're only able to update subset of TMs? > > > > > > > >> > > > > > > > > >> > Such case DTM will reschedule token obtain after > > > > > > > >> > "security.kerberos.tokens.retry-wait" time. > > > > > > > >> > > > > > > > > >> > > what happens if we exhaust retries? > > > > > > > >> > > > > > > > > >> > There is no number of retries. In default configuration > > tokens > > > > > needs > > > > > > > to > > > > > > > >> be > > > > > > > >> > re-obtained after one day. > > > > > > > >> > DTM tries to obtain new tokens after 1day * 0.75 > > > > > > > >> > (security.kerberos.tokens.renewal-ratio) = 18 hours. > > > > > > > >> > When fails it retries after > > > > "security.kerberos.tokens.retry-wait" > > > > > > > which > > > > > > > >> is > > > > > > > >> > 1 hour by default. > > > > > > > >> > If it never succeeds then authentication error is going to > > > > happen > > > > > on > > > > > > > the > > > > > > > >> > TM side and the workload is > > > > > > > >> > going to stop. > > > > > > > >> > > > > > > > > >> > > should we even have the retry mechanism whatsoever? > > > > > > > >> > > > > > > > > >> > Yes, because there are always temporary cluster issues. > > > > > > > >> > > > > > > > > >> > > What does it mean for the running application (how does > > this > > > > > look > > > > > > > like > > > > > > > >> > from > > > > > > > >> > the user perspective)? As far as I remember the logs are > > only > > > > > > > collected > > > > > > > >> > ("aggregated") after the container is stopped, is that > > > correct? > > > > > > > >> > > > > > > > > >> > With default config it works like that but it can be > forced > > to > > > > > > > aggregate > > > > > > > >> > at specific intervals. > > > > > > > >> > A useful feature is forcing YARN to aggregate logs while > the > > > job > > > > > is > > > > > > > >> still > > > > > > > >> > running. > > > > > > > >> > For long-running jobs such as streaming jobs, this is > > > > invaluable. > > > > > To > > > > > > > do > > > > > > > >> > this, > > > > > > > >> > > > > > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds > > > > > > must > > > > > > > >> be > > > > > > > >> > set to a non-negative value. > > > > > > > >> > When this is set, a timer will be set for the given > > duration, > > > > and > > > > > > > >> whenever > > > > > > > >> > that timer goes off, > > > > > > > >> > log aggregation will run on new files. > > > > > > > >> > > > > > > > > >> > > I think > > > > > > > >> > this topic should get its own section in the FLIP (having > > some > > > > > cross > > > > > > > >> > reference to YARN ticket would be really useful, but I'm > not > > > > sure > > > > > if > > > > > > > >> there > > > > > > > >> > are any). > > > > > > > >> > > > > > > > > >> > I think this is important knowledge but this FLIP is not > > > > touching > > > > > > the > > > > > > > >> > already existing behavior. > > > > > > > >> > DTs are set on the AM container which is renewed by YARN > > until > > > > > it's > > > > > > > not > > > > > > > >> > possible anymore. > > > > > > > >> > Any kind of new code is not going to change this > limitation. > > > > BTW, > > > > > > > there > > > > > > > >> is > > > > > > > >> > no jira for this. > > > > > > > >> > If you think it worth to write this down then I think the > > good > > > > > place > > > > > > > is > > > > > > > >> > the official security doc > > > > > > > >> > area as caveat. > > > > > > > >> > > > > > > > > >> > > If we split the FLIP into two parts / sections that I've > > > > > > suggested, > > > > > > > I > > > > > > > >> > don't > > > > > > > >> > really think that you need to explicitly test for each > > > > deployment > > > > > > > >> scenario > > > > > > > >> > / cluster framework, because the DTM part is completely > > > > > independent > > > > > > of > > > > > > > >> the > > > > > > > >> > deployment target. Basically this is what I'm aiming for > > with > > > > > > "making > > > > > > > it > > > > > > > >> > work with the standalone" (as simple as starting a new > java > > > > > process) > > > > > > > >> Flink > > > > > > > >> > first (which is also how most people deploy streaming > > > > application > > > > > on > > > > > > > k8s > > > > > > > >> > and the direction we're pushing forward with the > > auto-scaling > > > / > > > > > > > reactive > > > > > > > >> > mode initiatives). > > > > > > > >> > > > > > > > > >> > I see your point and agree the main direction. k8s is the > > > > > megatrend > > > > > > > >> which > > > > > > > >> > most of the peoples > > > > > > > >> > will use sooner or later. Not 100% sure what kind of split > > you > > > > > > suggest > > > > > > > >> but > > > > > > > >> > in my view > > > > > > > >> > the main target is to add this feature and I'm open to any > > > > logical > > > > > > > work > > > > > > > >> > ordering. > > > > > > > >> > Please share the specific details and we work it out... > > > > > > > >> > > > > > > > > >> > G > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Mon, Jan 24, 2022 at 3:04 PM David Morávek < > > > d...@apache.org> > > > > > > > wrote: > > > > > > > >> > > > > > > > > >> >> > > > > > > > > >> >> > Could you point to a code where you think it could be > > added > > > > > > > exactly? > > > > > > > >> A > > > > > > > >> >> > helping hand is welcome here 🙂 > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> I think you can take a look at > > > > _ResourceManagerPartitionTracker_ > > > > > > [1] > > > > > > > >> which > > > > > > > >> >> seems to have somewhat similar properties to the DTM. > > > > > > > >> >> > > > > > > > >> >> One topic that needs to be addressed there is how the RPC > > > with > > > > > the > > > > > > > >> >> _TaskExecutorGateway_ should look like. > > > > > > > >> >> - Do we need to introduce a new RPC method or can we for > > > > example > > > > > > > >> piggyback > > > > > > > >> >> on heartbeats? > > > > > > > >> >> - What delivery semantics are we looking for? (what if > > we're > > > > only > > > > > > > able > > > > > > > >> to > > > > > > > >> >> update subset of TMs / what happens if we exhaust > retries / > > > > > should > > > > > > we > > > > > > > >> even > > > > > > > >> >> have the retry mechanism whatsoever) - I have a feeling > > that > > > > > > somehow > > > > > > > >> >> leveraging the existing heartbeat mechanism could help to > > > > answer > > > > > > > these > > > > > > > >> >> questions > > > > > > > >> >> > > > > > > > >> >> In short, after DT reaches it's max lifetime then log > > > > aggregation > > > > > > > stops > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> What does it mean for the running application (how does > > this > > > > look > > > > > > > like > > > > > > > >> >> from > > > > > > > >> >> the user perspective)? As far as I remember the logs are > > only > > > > > > > collected > > > > > > > >> >> ("aggregated") after the container is stopped, is that > > > > correct? I > > > > > > > think > > > > > > > >> >> this topic should get its own section in the FLIP (having > > > some > > > > > > cross > > > > > > > >> >> reference to YARN ticket would be really useful, but I'm > > not > > > > sure > > > > > > if > > > > > > > >> there > > > > > > > >> >> are any). > > > > > > > >> >> > > > > > > > >> >> All deployment modes (per-job, per-app, ...) are planned > to > > > be > > > > > > tested > > > > > > > >> and > > > > > > > >> >> > expect to work with the initial implementation however > > not > > > > all > > > > > > > >> >> deployment > > > > > > > >> >> > targets (k8s, local, ... > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> If we split the FLIP into two parts / sections that I've > > > > > > suggested, I > > > > > > > >> >> don't > > > > > > > >> >> really think that you need to explicitly test for each > > > > deployment > > > > > > > >> scenario > > > > > > > >> >> / cluster framework, because the DTM part is completely > > > > > independent > > > > > > > of > > > > > > > >> the > > > > > > > >> >> deployment target. Basically this is what I'm aiming for > > with > > > > > > "making > > > > > > > >> it > > > > > > > >> >> work with the standalone" (as simple as starting a new > java > > > > > > process) > > > > > > > >> Flink > > > > > > > >> >> first (which is also how most people deploy streaming > > > > application > > > > > > on > > > > > > > >> k8s > > > > > > > >> >> and the direction we're pushing forward with the > > > auto-scaling / > > > > > > > >> reactive > > > > > > > >> >> mode initiatives). > > > > > > > >> >> > > > > > > > >> >> The whole integration with YARN (let's forget about log > > > > > aggregation > > > > > > > >> for a > > > > > > > >> >> moment) / k8s-native only boils down to how do we make > the > > > > keytab > > > > > > > file > > > > > > > >> >> local to the JobManager so the DTM can read it, so it's > > > > basically > > > > > > > >> built on > > > > > > > >> >> top of that. The only special thing that needs to be > tested > > > > there > > > > > > is > > > > > > > >> the > > > > > > > >> >> "keytab distribution" code path. > > > > > > > >> >> > > > > > > > >> >> [1] > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java > > > > > > > >> >> > > > > > > > >> >> Best, > > > > > > > >> >> D. > > > > > > > >> >> > > > > > > > >> >> On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi < > > > > > > > >> gabor.g.somo...@gmail.com > > > > > > > >> >> > > > > > > > > >> >> wrote: > > > > > > > >> >> > > > > > > > >> >> > > There is a separate JobMaster for each job > > > > > > > >> >> > within a Flink cluster and each JobMaster only has a > > > partial > > > > > view > > > > > > > of > > > > > > > >> the > > > > > > > >> >> > task managers > > > > > > > >> >> > > > > > > > > >> >> > Good point! I've had a deeper look and you're right. We > > > > > > definitely > > > > > > > >> need > > > > > > > >> >> to > > > > > > > >> >> > find another place. > > > > > > > >> >> > > > > > > > > >> >> > > Related per-cluster or per-job keytab: > > > > > > > >> >> > > > > > > > > >> >> > In the current code per-cluster keytab is implemented > and > > > I'm > > > > > > > >> intended > > > > > > > >> >> to > > > > > > > >> >> > keep it like this within this FLIP. The reason is > simple: > > > > > tokens > > > > > > on > > > > > > > >> TM > > > > > > > >> >> side > > > > > > > >> >> > can be stored within the UserGroupInformation (UGI) > > > structure > > > > > > which > > > > > > > >> is > > > > > > > >> >> > global. I'm not telling it's impossible to change that > > but > > > I > > > > > > think > > > > > > > >> that > > > > > > > >> >> > this is such a complexity which the initial > > implementation > > > is > > > > > not > > > > > > > >> >> required > > > > > > > >> >> > to contain. Additionally we've not seen such need from > > user > > > > > side. > > > > > > > If > > > > > > > >> the > > > > > > > >> >> > need may rise later on then another FLIP with this > topic > > > can > > > > be > > > > > > > >> created > > > > > > > >> >> and > > > > > > > >> >> > discussed. Proper multi-UGI handling within a single > JVM > > > is a > > > > > > topic > > > > > > > >> >> where > > > > > > > >> >> > several round of deep-dive with the Hadoop/YARN guys > are > > > > > > required. > > > > > > > >> >> > > > > > > > > >> >> > > single DTM instance embedded with > > > > > > > >> >> > the ResourceManager (the Flink component) > > > > > > > >> >> > > > > > > > > >> >> > Could you point to a code where you think it could be > > added > > > > > > > exactly? > > > > > > > >> A > > > > > > > >> >> > helping hand is welcome here🙂 > > > > > > > >> >> > > > > > > > > >> >> > > Then the single (initial) implementation should work > > with > > > > all > > > > > > the > > > > > > > >> >> > deployments modes out of the box (which is not what the > > > FLIP > > > > > > > >> suggests). > > > > > > > >> >> Is > > > > > > > >> >> > that correct? > > > > > > > >> >> > > > > > > > > >> >> > All deployment modes (per-job, per-app, ...) are > planned > > to > > > > be > > > > > > > tested > > > > > > > >> >> and > > > > > > > >> >> > expect to work with the initial implementation however > > not > > > > all > > > > > > > >> >> deployment > > > > > > > >> >> > targets (k8s, local, ...) are not intended to be > tested. > > > Per > > > > > > > >> deployment > > > > > > > >> >> > target new jira needs to be created where I expect > small > > > > number > > > > > > of > > > > > > > >> codes > > > > > > > >> >> > needs to be added and relatively expensive testing > effort > > > is > > > > > > > >> required. > > > > > > > >> >> > > > > > > > > >> >> > > I've taken a look into the prototype and in the > > > > > > > >> >> "YarnClusterDescriptor" > > > > > > > >> >> > you're injecting a delegation token into the AM [1] > > (that's > > > > > > > obtained > > > > > > > >> >> using > > > > > > > >> >> > the provided keytab). If I understand this correctly > from > > > > > > previous > > > > > > > >> >> > discussion / FLIP, this is to support log aggregation > and > > > DT > > > > > has > > > > > > a > > > > > > > >> >> limited > > > > > > > >> >> > validity. How is this DT going to be renewed? > > > > > > > >> >> > > > > > > > > >> >> > You're clever and touched a limitation which Spark has > > too. > > > > In > > > > > > > short, > > > > > > > >> >> after > > > > > > > >> >> > DT reaches it's max lifetime then log aggregation > stops. > > > I've > > > > > had > > > > > > > >> >> several > > > > > > > >> >> > deep-dive rounds with the YARN guys at Spark years > > because > > > > > wanted > > > > > > > to > > > > > > > >> >> fill > > > > > > > >> >> > this gap. They can't provide us any way to re-inject > the > > > > newly > > > > > > > >> obtained > > > > > > > >> >> DT > > > > > > > >> >> > so at the end I gave up this. > > > > > > > >> >> > > > > > > > > >> >> > BR, > > > > > > > >> >> > G > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > On Mon, 24 Jan 2022, 11:00 David Morávek, < > > d...@apache.org > > > > > > > > > > wrote: > > > > > > > >> >> > > > > > > > > >> >> > > Hi Gabor, > > > > > > > >> >> > > > > > > > > > >> >> > > There is actually a huge difference between > JobManager > > > > > > (process) > > > > > > > >> and > > > > > > > >> >> > > JobMaster (job coordinator). The naming is > > unfortunately > > > > bit > > > > > > > >> >> misleading > > > > > > > >> >> > > here from historical reasons. There is a separate > > > JobMaster > > > > > for > > > > > > > >> each > > > > > > > >> >> job > > > > > > > >> >> > > within a Flink cluster and each JobMaster only has a > > > > partial > > > > > > view > > > > > > > >> of > > > > > > > >> >> the > > > > > > > >> >> > > task managers (depends on where the slots for a > > > particular > > > > > job > > > > > > > are > > > > > > > >> >> > > allocated). This means that you'll end up with N > > > > > > > >> >> > "DelegationTokenManagers" > > > > > > > >> >> > > competing with each other (N = number of running jobs > > in > > > > the > > > > > > > >> cluster). > > > > > > > >> >> > > > > > > > > > >> >> > > This makes me think we're mixing two abstraction > levels > > > > here: > > > > > > > >> >> > > > > > > > > > >> >> > > a) Per-cluster delegation tokens > > > > > > > >> >> > > - Simpler approach, it would involve a single DTM > > > instance > > > > > > > embedded > > > > > > > >> >> with > > > > > > > >> >> > > the ResourceManager (the Flink component) > > > > > > > >> >> > > b) Per-job delegation tokens > > > > > > > >> >> > > - More complex approach, but could be more flexible > > from > > > > the > > > > > > user > > > > > > > >> >> side of > > > > > > > >> >> > > things. > > > > > > > >> >> > > - Multiple DTM instances, that are bound with the > > > JobMaster > > > > > > > >> lifecycle. > > > > > > > >> >> > > Delegation tokens are attached with a particular > slots > > > that > > > > > are > > > > > > > >> >> executing > > > > > > > >> >> > > the job tasks instead of the whole task manager (TM > > could > > > > be > > > > > > > >> executing > > > > > > > >> >> > > multiple jobs with different tokens). > > > > > > > >> >> > > - The question is which keytab should be used for the > > > > > > clustering > > > > > > > >> >> > framework, > > > > > > > >> >> > > to support log aggregation on YARN (an extra keytab, > > > keytab > > > > > > that > > > > > > > >> comes > > > > > > > >> >> > with > > > > > > > >> >> > > the first job?) > > > > > > > >> >> > > > > > > > > > >> >> > > I think these are the things that need to be > clarified > > in > > > > the > > > > > > > FLIP > > > > > > > >> >> before > > > > > > > >> >> > > proceeding. > > > > > > > >> >> > > > > > > > > > >> >> > > A follow-up question for getting a better > understanding > > > > where > > > > > > > this > > > > > > > >> >> should > > > > > > > >> >> > > be headed: Are there any use cases where user may > want > > to > > > > use > > > > > > > >> >> different > > > > > > > >> >> > > keytabs with each job, or are we fine with using a > > > > > cluster-wide > > > > > > > >> >> keytab? > > > > > > > >> >> > If > > > > > > > >> >> > > we go with per-cluster keytabs, is it OK that all > jobs > > > > > > submitted > > > > > > > >> into > > > > > > > >> >> > this > > > > > > > >> >> > > cluster can access it (even the future ones)? Should > > this > > > > be > > > > > a > > > > > > > >> >> security > > > > > > > >> >> > > concern? > > > > > > > >> >> > > > > > > > > > >> >> > > Presume you though I would implement a new class with > > > > > > JobManager > > > > > > > >> name. > > > > > > > >> >> > The > > > > > > > >> >> > > > plan is not that. > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > I've never suggested such thing. > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > No. That said earlier DT handling is planned to be > > done > > > > > > > >> completely > > > > > > > >> >> in > > > > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains > > tokens > > > > in > > > > > > the > > > > > > > >> >> proper > > > > > > > >> >> > > time > > > > > > > >> >> > > > when needed. > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > Then the single (initial) implementation should work > > with > > > > all > > > > > > the > > > > > > > >> >> > > deployments modes out of the box (which is not what > the > > > > FLIP > > > > > > > >> >> suggests). > > > > > > > >> >> > Is > > > > > > > >> >> > > that correct? > > > > > > > >> >> > > > > > > > > > >> >> > > If the cluster framework, also requires delegation > > token > > > > for > > > > > > > their > > > > > > > >> >> inner > > > > > > > >> >> > > working (this is IMO only applies to YARN), it might > > need > > > > an > > > > > > > extra > > > > > > > >> >> step > > > > > > > >> >> > > (injecting the token into application master > > container). > > > > > > > >> >> > > > > > > > > > >> >> > > Separating the individual layers (actual Flink > cluster > > - > > > > > > > basically > > > > > > > >> >> making > > > > > > > >> >> > > this work with a standalone deployment / "cluster > > > > > framework" - > > > > > > > >> >> support > > > > > > > >> >> > for > > > > > > > >> >> > > YARN log aggregation) in the FLIP would be useful. > > > > > > > >> >> > > > > > > > > > >> >> > > Reading the linked Spark readme could be useful. > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > I've read that, but please be patient with the > > questions, > > > > > > > Kerberos > > > > > > > >> is > > > > > > > >> >> not > > > > > > > >> >> > > an easy topic to get into and I've had a very little > > > > contact > > > > > > with > > > > > > > >> it > > > > > > > >> >> in > > > > > > > >> >> > the > > > > > > > >> >> > > past. > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > I've taken a look into the prototype and in the > > > > > > > >> >> "YarnClusterDescriptor" > > > > > > > >> >> > > you're injecting a delegation token into the AM [1] > > > (that's > > > > > > > >> obtained > > > > > > > >> >> > using > > > > > > > >> >> > > the provided keytab). If I understand this correctly > > from > > > > > > > previous > > > > > > > >> >> > > discussion / FLIP, this is to support log aggregation > > and > > > > DT > > > > > > has > > > > > > > a > > > > > > > >> >> > limited > > > > > > > >> >> > > validity. How is this DT going to be renewed? > > > > > > > >> >> > > > > > > > > > >> >> > > [1] > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261 > > > > > > > >> >> > > > > > > > > > >> >> > > Best, > > > > > > > >> >> > > D. > > > > > > > >> >> > > > > > > > > > >> >> > > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi < > > > > > > > >> >> gabor.g.somo...@gmail.com > > > > > > > >> >> > > > > > > > > > >> >> > > wrote: > > > > > > > >> >> > > > > > > > > > >> >> > > > Here is the exact class, I'm from mobile so not > had a > > > > look > > > > > at > > > > > > > the > > > > > > > >> >> exact > > > > > > > >> >> > > > class name: > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 > > > > > > > >> >> > > > That keeps track of TMs where the tokens can be > sent > > > to. > > > > > > > >> >> > > > > > > > > > > >> >> > > > > My feeling would be that we shouldn't really > > > introduce > > > > a > > > > > > new > > > > > > > >> >> > component > > > > > > > >> >> > > > with > > > > > > > >> >> > > > a custom lifecycle, but rather we should try to > > > > incorporate > > > > > > > this > > > > > > > >> >> into > > > > > > > >> >> > > > existing ones. > > > > > > > >> >> > > > > > > > > > > >> >> > > > Can you be more specific? Presume you though I > would > > > > > > implement > > > > > > > a > > > > > > > >> new > > > > > > > >> >> > > class > > > > > > > >> >> > > > with JobManager name. The plan is not that. > > > > > > > >> >> > > > > > > > > > > >> >> > > > > If I understand this correctly, this means that > we > > > then > > > > > > push > > > > > > > >> the > > > > > > > >> >> > token > > > > > > > >> >> > > > renewal logic to YARN. > > > > > > > >> >> > > > > > > > > > > >> >> > > > No. That said earlier DT handling is planned to be > > done > > > > > > > >> completely > > > > > > > >> >> in > > > > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains > > tokens > > > > in > > > > > > the > > > > > > > >> >> proper > > > > > > > >> >> > > time > > > > > > > >> >> > > > when needed. YARN log aggregation is a totally > > > different > > > > > > > feature, > > > > > > > >> >> where > > > > > > > >> >> > > > YARN does the renewal. Log aggregation was an > example > > > why > > > > > the > > > > > > > >> code > > > > > > > >> >> > can't > > > > > > > >> >> > > be > > > > > > > >> >> > > > 100% reusable for all resource managers. Reading > the > > > > linked > > > > > > > Spark > > > > > > > >> >> > readme > > > > > > > >> >> > > > could be useful. > > > > > > > >> >> > > > > > > > > > > >> >> > > > G > > > > > > > >> >> > > > > > > > > > > >> >> > > > On Fri, 21 Jan 2022, 21:05 David Morávek, < > > > > d...@apache.org > > > > > > > > > > > > > >> wrote: > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > JobManager is the Flink class. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > There is no such class in Flink. The closest > thing > > to > > > > the > > > > > > > >> >> JobManager > > > > > > > >> >> > > is a > > > > > > > >> >> > > > > ClusterEntrypoint. The cluster entrypoint spawns > > new > > > RM > > > > > > > Runner > > > > > > > >> & > > > > > > > >> >> > > > Dispatcher > > > > > > > >> >> > > > > Runner that start participating in the leader > > > election. > > > > > > Once > > > > > > > >> they > > > > > > > >> >> > gain > > > > > > > >> >> > > > > leadership they spawn the actual underlying > > instances > > > > of > > > > > > > these > > > > > > > >> two > > > > > > > >> >> > > "main > > > > > > > >> >> > > > > components". > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > My feeling would be that we shouldn't really > > > introduce > > > > a > > > > > > new > > > > > > > >> >> > component > > > > > > > >> >> > > > with > > > > > > > >> >> > > > > a custom lifecycle, but rather we should try to > > > > > incorporate > > > > > > > >> this > > > > > > > >> >> into > > > > > > > >> >> > > > > existing ones. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > My biggest concerns would be: > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > - How would the lifecycle of the new component > look > > > > like > > > > > > with > > > > > > > >> >> regards > > > > > > > >> >> > > to > > > > > > > >> >> > > > HA > > > > > > > >> >> > > > > setups. If we really try to decide to introduce a > > > > > > completely > > > > > > > >> new > > > > > > > >> >> > > > component, > > > > > > > >> >> > > > > how should this work in case of multiple > JobManager > > > > > > > instances? > > > > > > > >> >> > > > > - Which components does it talk to / how? For > > example > > > > how > > > > > > > does > > > > > > > >> the > > > > > > > >> >> > > > > broadcast of new token to task managers > > > > > > (TaskManagerGateway) > > > > > > > >> look > > > > > > > >> >> > like? > > > > > > > >> >> > > > Do > > > > > > > >> >> > > > > we simply introduce a new RPC on the > > > > > ResourceManagerGateway > > > > > > > >> that > > > > > > > >> >> > > > broadcasts > > > > > > > >> >> > > > > it or does the new component need to do some kind > > of > > > > > > > >> bookkeeping > > > > > > > >> >> of > > > > > > > >> >> > > task > > > > > > > >> >> > > > > managers that it needs to notify? > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > YARN based HDFS log aggregation would not work by > > > > > dropping > > > > > > > that > > > > > > > >> >> code. > > > > > > > >> >> > > > Just > > > > > > > >> >> > > > > > to be crystal clear, the actual implementation > > > > contains > > > > > > > this > > > > > > > >> fir > > > > > > > >> >> > > > exactly > > > > > > > >> >> > > > > > this reason. > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > This is the missing part +1. If I understand this > > > > > > correctly, > > > > > > > >> this > > > > > > > >> >> > means > > > > > > > >> >> > > > > that we then push the token renewal logic to > YARN. > > > How > > > > do > > > > > > you > > > > > > > >> >> plan to > > > > > > > >> >> > > > > implement the renewal logic on k8s? > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > D. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi < > > > > > > > >> >> > > gabor.g.somo...@gmail.com > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > I think we might both mean something > different > > by > > > > the > > > > > > RM. > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > You feel it well, I've not specified these > terms > > > well > > > > > in > > > > > > > the > > > > > > > >> >> > > > explanation. > > > > > > > >> >> > > > > > RM I meant resource management framework. > > > JobManager > > > > is > > > > > > the > > > > > > > >> >> Flink > > > > > > > >> >> > > > class. > > > > > > > >> >> > > > > > This means that inside JM instance there will > be > > a > > > > DTM > > > > > > > >> >> instance, so > > > > > > > >> >> > > > they > > > > > > > >> >> > > > > > would have the same lifecycle. Hope I've > answered > > > the > > > > > > > >> question. > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > If we have tokens available on the client > side, > > > why > > > > > do > > > > > > we > > > > > > > >> >> need to > > > > > > > >> >> > > set > > > > > > > >> >> > > > > > them > > > > > > > >> >> > > > > > into the AM (yarn specific concept) launch > > context? > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > YARN based HDFS log aggregation would not work > by > > > > > > dropping > > > > > > > >> that > > > > > > > >> >> > code. > > > > > > > >> >> > > > > Just > > > > > > > >> >> > > > > > to be crystal clear, the actual implementation > > > > contains > > > > > > > this > > > > > > > >> fir > > > > > > > >> >> > > > exactly > > > > > > > >> >> > > > > > this reason. > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > G > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, < > > > > > > d...@apache.org > > > > > > > > > > > > > > > >> >> wrote: > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > Hi Gabor, > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > 1. One thing is important, token management > is > > > > > planned > > > > > > to > > > > > > > >> be > > > > > > > >> >> done > > > > > > > >> >> > > > > > > > generically within Flink and not scattered > in > > > RM > > > > > > > specific > > > > > > > >> >> code. > > > > > > > >> >> > > > > > > JobManager > > > > > > > >> >> > > > > > > > has a DelegationTokenManager which obtains > > > tokens > > > > > > > >> >> time-to-time > > > > > > > >> >> > > (if > > > > > > > >> >> > > > > > > > configured properly). JM knows which > > > TaskManagers > > > > > are > > > > > > > in > > > > > > > >> >> place > > > > > > > >> >> > so > > > > > > > >> >> > > > it > > > > > > > >> >> > > > > > can > > > > > > > >> >> > > > > > > > distribute it to all TMs. That's it > > basically. > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > I think we might both mean something > different > > by > > > > the > > > > > > RM. > > > > > > > >> >> > > JobManager > > > > > > > >> >> > > > is > > > > > > > >> >> > > > > > > basically just a process encapsulating > multiple > > > > > > > components, > > > > > > > >> >> one > > > > > > > >> >> > of > > > > > > > >> >> > > > > which > > > > > > > >> >> > > > > > is > > > > > > > >> >> > > > > > > a ResourceManager, which is the component > that > > > > > manages > > > > > > > task > > > > > > > >> >> > manager > > > > > > > >> >> > > > > > > registrations [1]. There is more or less a > > single > > > > > > > >> >> implementation > > > > > > > >> >> > of > > > > > > > >> >> > > > the > > > > > > > >> >> > > > > > RM > > > > > > > >> >> > > > > > > with plugable drivers for the active > > integrations > > > > > > (yarn, > > > > > > > >> k8s). > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > It would be great if you could share more > > details > > > > of > > > > > > how > > > > > > > >> >> exactly > > > > > > > >> >> > > the > > > > > > > >> >> > > > > DTM > > > > > > > >> >> > > > > > is > > > > > > > >> >> > > > > > > going to fit in the current JM architecture. > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > 2. 99.9% of the code is generic but each RM > > > handles > > > > > > > tokens > > > > > > > >> >> > > > > differently. A > > > > > > > >> >> > > > > > > > good example is YARN obtains tokens on > client > > > > side > > > > > > and > > > > > > > >> then > > > > > > > >> >> > sets > > > > > > > >> >> > > > them > > > > > > > >> >> > > > > > on > > > > > > > >> >> > > > > > > > the newly created AM container launch > > context. > > > > This > > > > > > is > > > > > > > >> >> purely > > > > > > > >> >> > > YARN > > > > > > > >> >> > > > > > > specific > > > > > > > >> >> > > > > > > > and cant't be spared. With my actual plans > > > > > standalone > > > > > > > >> can be > > > > > > > >> >> > > > changed > > > > > > > >> >> > > > > to > > > > > > > >> >> > > > > > > use > > > > > > > >> >> > > > > > > > the framework. By using it I mean no RM > > > specific > > > > > DTM > > > > > > or > > > > > > > >> >> > > whatsoever > > > > > > > >> >> > > > is > > > > > > > >> >> > > > > > > > needed. > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > If we have tokens available on the client > side, > > > why > > > > > do > > > > > > we > > > > > > > >> >> need to > > > > > > > >> >> > > set > > > > > > > >> >> > > > > > them > > > > > > > >> >> > > > > > > into the AM (yarn specific concept) launch > > > context? > > > > > Why > > > > > > > >> can't > > > > > > > >> >> we > > > > > > > >> >> > > > simply > > > > > > > >> >> > > > > > > send them to the JM, eg. as a parameter of > the > > > job > > > > > > > >> submission > > > > > > > >> >> / > > > > > > > >> >> > via > > > > > > > >> >> > > > > > > separate RPC call? There might be something > I'm > > > > > missing > > > > > > > >> due to > > > > > > > >> >> > > > limited > > > > > > > >> >> > > > > > > knowledge, but handling the token on the > > "cluster > > > > > > > >> framework" > > > > > > > >> >> > level > > > > > > > >> >> > > > > > doesn't > > > > > > > >> >> > > > > > > seem necessary. > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > [1] > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > Best, > > > > > > > >> >> > > > > > > D. > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor > Somogyi < > > > > > > > >> >> > > > > gabor.g.somo...@gmail.com > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > Oh and one more thing. I'm planning to add > > this > > > > > > feature > > > > > > > >> in > > > > > > > >> >> > small > > > > > > > >> >> > > > > chunk > > > > > > > >> >> > > > > > of > > > > > > > >> >> > > > > > > > PRs because security is super hairy area. > > That > > > > way > > > > > > > >> reviewers > > > > > > > >> >> > can > > > > > > > >> >> > > be > > > > > > > >> >> > > > > > more > > > > > > > >> >> > > > > > > > easily obtains the concept. > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, < > > > > > > > >> d...@apache.org> > > > > > > > >> >> > > wrote: > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > Hi Gabor, > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > thanks for drafting the FLIP, I think > > having > > > a > > > > > > solid > > > > > > > >> >> Kerberos > > > > > > > >> >> > > > > support > > > > > > > >> >> > > > > > > is > > > > > > > >> >> > > > > > > > > crucial for many enterprise deployments. > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > I have multiple questions regarding the > > > > > > > implementation > > > > > > > >> >> (note > > > > > > > >> >> > > > that I > > > > > > > >> >> > > > > > > have > > > > > > > >> >> > > > > > > > > very limited knowledge of Kerberos): > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > 1) If I understand it correctly, we'll > only > > > > > obtain > > > > > > > >> tokens > > > > > > > >> >> in > > > > > > > >> >> > > the > > > > > > > >> >> > > > > job > > > > > > > >> >> > > > > > > > > manager and then we'll distribute them > via > > > RPC > > > > > > (needs > > > > > > > >> to > > > > > > > >> >> be > > > > > > > >> >> > > > > secured). > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > Can you please outline how the > > communication > > > > will > > > > > > > look > > > > > > > >> >> like? > > > > > > > >> >> > Is > > > > > > > >> >> > > > the > > > > > > > >> >> > > > > > > > > DelegationTokenManager going to be a part > > of > > > > the > > > > > > > >> >> > > ResourceManager? > > > > > > > >> >> > > > > Can > > > > > > > >> >> > > > > > > you > > > > > > > >> >> > > > > > > > > outline it's lifecycle / how it's going > to > > be > > > > > > > >> integrated > > > > > > > >> >> > there? > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > 2) Do we really need a YARN / k8s > specific > > > > > > > >> >> implementations? > > > > > > > >> >> > Is > > > > > > > >> >> > > it > > > > > > > >> >> > > > > > > > possible > > > > > > > >> >> > > > > > > > > to obtain / renew a token in a generic > way? > > > > Maybe > > > > > > to > > > > > > > >> >> rephrase > > > > > > > >> >> > > > that, > > > > > > > >> >> > > > > > is > > > > > > > >> >> > > > > > > it > > > > > > > >> >> > > > > > > > > possible to implement > > DelegationTokenManager > > > > for > > > > > > the > > > > > > > >> >> > standalone > > > > > > > >> >> > > > > > Flink? > > > > > > > >> >> > > > > > > If > > > > > > > >> >> > > > > > > > > we're able to solve this point, it could > be > > > > > > possible > > > > > > > to > > > > > > > >> >> > target > > > > > > > >> >> > > > all > > > > > > > >> >> > > > > > > > > deployment scenarios with a single > > > > > implementation. > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > Best, > > > > > > > >> >> > > > > > > > > D. > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan > > Zhang > > > < > > > > > > > >> >> > > > > > zuston.sha...@gmail.com> > > > > > > > >> >> > > > > > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > Hi G > > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > Thanks for your explain in detail. I > have > > > > > gotten > > > > > > > your > > > > > > > >> >> > > thoughts, > > > > > > > >> >> > > > > and > > > > > > > >> >> > > > > > > any > > > > > > > >> >> > > > > > > > > > way this proposal > > > > > > > >> >> > > > > > > > > > is a great improvement. > > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > Looking forward to your implementation > > and > > > i > > > > > will > > > > > > > >> keep > > > > > > > >> >> > focus > > > > > > > >> >> > > on > > > > > > > >> >> > > > > it. > > > > > > > >> >> > > > > > > > > > Thanks again. > > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > Best > > > > > > > >> >> > > > > > > > > > JunFan. > > > > > > > >> >> > > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor > > > > Somogyi < > > > > > > > >> >> > > > > > > > gabor.g.somo...@gmail.com > > > > > > > >> >> > > > > > > > > >, > > > > > > > >> >> > > > > > > > > > wrote: > > > > > > > >> >> > > > > > > > > > > Just to confirm keeping > > > > > > > >> >> > > > > > "security.kerberos.fetch.delegation-token" > > > > > > > >> >> > > > > > > is > > > > > > > >> >> > > > > > > > > > added > > > > > > > >> >> > > > > > > > > > > to the doc. > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > BR, > > > > > > > >> >> > > > > > > > > > > G > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor > > > > > Somogyi < > > > > > > > >> >> > > > > > > > > gabor.g.somo...@gmail.com > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > Hi JunFan, > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > By the way, maybe this should be > > > added > > > > in > > > > > > the > > > > > > > >> >> > migration > > > > > > > >> >> > > > > plan > > > > > > > >> >> > > > > > or > > > > > > > >> >> > > > > > > > > > > > intergation section in the > FLIP-211. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > Going to add this soon. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Besides, I have a question that > the > > > KDC > > > > > > will > > > > > > > >> >> collapse > > > > > > > >> >> > > > when > > > > > > > >> >> > > > > > the > > > > > > > >> >> > > > > > > > > > cluster > > > > > > > >> >> > > > > > > > > > > > reached 200 nodes you described > > > > > > > >> >> > > > > > > > > > > > in the google doc. Do you have any > > > > > attachment > > > > > > > or > > > > > > > >> >> > > reference > > > > > > > >> >> > > > to > > > > > > > >> >> > > > > > > prove > > > > > > > >> >> > > > > > > > > it? > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > "KDC *may* collapse under some > > > > > circumstances" > > > > > > > is > > > > > > > >> the > > > > > > > >> >> > > proper > > > > > > > >> >> > > > > > > > wording. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > We have several customers who are > > > > executing > > > > > > > >> >> workloads > > > > > > > >> >> > on > > > > > > > >> >> > > > > > > > Spark/Flink. > > > > > > > >> >> > > > > > > > > > Most > > > > > > > >> >> > > > > > > > > > > > of the time I'm facing their > > > > > > > >> >> > > > > > > > > > > > daily issues which is heavily > > > environment > > > > > and > > > > > > > >> >> use-case > > > > > > > >> >> > > > > > dependent. > > > > > > > >> >> > > > > > > > > I've > > > > > > > >> >> > > > > > > > > > > > seen various cases: > > > > > > > >> >> > > > > > > > > > > > * where the mentioned ~1k nodes > were > > > > > working > > > > > > > fine > > > > > > > >> >> > > > > > > > > > > > * where KDC thought the number of > > > > requests > > > > > > are > > > > > > > >> >> coming > > > > > > > >> >> > > from > > > > > > > >> >> > > > > DDOS > > > > > > > >> >> > > > > > > > > attack > > > > > > > >> >> > > > > > > > > > so > > > > > > > >> >> > > > > > > > > > > > discontinued authentication > > > > > > > >> >> > > > > > > > > > > > * where KDC was simply not > responding > > > > > because > > > > > > > of > > > > > > > >> the > > > > > > > >> >> > load > > > > > > > >> >> > > > > > > > > > > > * where KDC was intermittently had > > some > > > > > > outage > > > > > > > >> (this > > > > > > > >> >> > was > > > > > > > >> >> > > > the > > > > > > > >> >> > > > > > most > > > > > > > >> >> > > > > > > > > nasty > > > > > > > >> >> > > > > > > > > > > > thing) > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > Since you're managing relatively > big > > > > > cluster > > > > > > > then > > > > > > > >> >> you > > > > > > > >> >> > > know > > > > > > > >> >> > > > > that > > > > > > > >> >> > > > > > > KDC > > > > > > > >> >> > > > > > > > > is > > > > > > > >> >> > > > > > > > > > not > > > > > > > >> >> > > > > > > > > > > > only used by Spark/Flink workloads > > > > > > > >> >> > > > > > > > > > > > but the whole company IT > > infrastructure > > > > is > > > > > > > >> bombing > > > > > > > >> >> it > > > > > > > >> >> > so > > > > > > > >> >> > > it > > > > > > > >> >> > > > > > > really > > > > > > > >> >> > > > > > > > > > depends > > > > > > > >> >> > > > > > > > > > > > on other factors too whether KDC is > > > > > reaching > > > > > > > >> >> > > > > > > > > > > > it's limit or not. Not sure what > kind > > > of > > > > > > > evidence > > > > > > > >> >> are > > > > > > > >> >> > you > > > > > > > >> >> > > > > > looking > > > > > > > >> >> > > > > > > > for > > > > > > > >> >> > > > > > > > > > but > > > > > > > >> >> > > > > > > > > > > > I'm not authorized to share any > > > > information > > > > > > > about > > > > > > > >> >> > > > > > > > > > > > our clients data. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > One thing is for sure. The more > > > external > > > > > > system > > > > > > > >> >> types > > > > > > > >> >> > are > > > > > > > >> >> > > > > used > > > > > > > >> >> > > > > > in > > > > > > > >> >> > > > > > > > > > > > workloads (for ex. HDFS, HBase, > Hive, > > > > > Kafka) > > > > > > > >> which > > > > > > > >> >> > > > > > > > > > > > are authenticating through KDC the > > more > > > > > > > >> possibility > > > > > > > >> >> to > > > > > > > >> >> > > > reach > > > > > > > >> >> > > > > > this > > > > > > > >> >> > > > > > > > > > > > threshold when the cluster is big > > > enough. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > All in all this feature is here to > > help > > > > all > > > > > > > users > > > > > > > >> >> never > > > > > > > >> >> > > > reach > > > > > > > >> >> > > > > > > this > > > > > > > >> >> > > > > > > > > > > > limitation. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > BR, > > > > > > > >> >> > > > > > > > > > > > G > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM > 张俊帆 < > > > > > > > >> >> > > > zuston.sha...@gmail.com > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Hi G > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Thanks for your quick reply. I > > think > > > > > > > reserving > > > > > > > >> the > > > > > > > >> >> > > config > > > > > > > >> >> > > > > of > > > > > > > >> >> > > > > > > > > > > > > > > > > > *security.kerberos.fetch.delegation-token* > > > > > > > >> >> > > > > > > > > > > > > and simplifying disable the token > > > > > fetching > > > > > > > is a > > > > > > > >> >> good > > > > > > > >> >> > > > > idea.By > > > > > > > >> >> > > > > > > the > > > > > > > >> >> > > > > > > > > way, > > > > > > > >> >> > > > > > > > > > > > > maybe this should be added > > > > > > > >> >> > > > > > > > > > > > > in the migration plan or > > intergation > > > > > > section > > > > > > > in > > > > > > > >> >> the > > > > > > > >> >> > > > > FLIP-211. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Besides, I have a question that > the > > > KDC > > > > > > will > > > > > > > >> >> collapse > > > > > > > >> >> > > > when > > > > > > > >> >> > > > > > the > > > > > > > >> >> > > > > > > > > > cluster > > > > > > > >> >> > > > > > > > > > > > > reached 200 nodes you described > > > > > > > >> >> > > > > > > > > > > > > in the google doc. Do you have > any > > > > > > attachment > > > > > > > >> or > > > > > > > >> >> > > > reference > > > > > > > >> >> > > > > to > > > > > > > >> >> > > > > > > > prove > > > > > > > >> >> > > > > > > > > > it? > > > > > > > >> >> > > > > > > > > > > > > Because in our internal > > per-cluster, > > > > > > > >> >> > > > > > > > > > > > > the nodes reaches > 1000 and KDC > > > looks > > > > > > good. > > > > > > > >> Do i > > > > > > > >> >> > > missed > > > > > > > >> >> > > > or > > > > > > > >> >> > > > > > > > > > misunderstood > > > > > > > >> >> > > > > > > > > > > > > something? Please correct me. > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Best > > > > > > > >> >> > > > > > > > > > > > > JunFan. > > > > > > > >> >> > > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800, > > > > > > > >> >> dev@flink.apache.org > > > > > > > >> >> > , > > > > > > > >> >> > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >