We've made the changes both in the doc + wiki. Please have a look and notify me if I've missed something based on our agreement.
G On Fri, Jan 28, 2022 at 4:04 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Thanks for making the design better! No further thing to discuss from my > side. > > Started to reflect the agreement in the FLIP doc. > Since I don't have access to the wiki I need to ask Marci to do that which > may take some time. > > G > > > On Fri, Jan 28, 2022 at 3:52 PM David Morávek <d...@apache.org> wrote: > >> Hi, >> >> AFAIU an under registration TM is not added to the registered TMs map >> until >> > RegistrationResponse .. >> > >> >> I think you're right, with a careful design around threading (delegating >> update broadcasts to the main thread) + synchronous initial update (that >> would be nice to avoid) this should be doable. >> >> Not sure what you mean "we can't register the TM without providing it with >> > token" but in unsecure configuration registration must happen w/o >> tokens. >> > >> >> Exactly as you describe it, this was meant only for the "kerberized / >> secured" cluster case, in other cases we wouldn't enforce a non-null token >> in the response >> >> I think this is a good idea in general. >> > >> >> +1 >> >> If you don't have any more thoughts on the RPC / lifecycle part, can you >> please reflect it into the FLIP? >> >> D. >> >> On Fri, Jan 28, 2022 at 3:16 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >> > > - Make sure DTs issued by single DTMs are monotonically increasing >> (can >> > be >> > sorted on TM side) >> > >> > AFAIU an under registration TM is not added to the registered TMs map >> until >> > RegistrationResponse >> > is processed which would contain the initial tokens. If that's true then >> > how is it possible to have race with >> > DTM update which is working on the registered TMs list? >> > To be more specific "taskExecutors" is the registered map of TMs to >> which >> > DTM can send updated tokens >> > but this doesn't contain the under registration TM while >> > RegistrationResponse is not processed, right? >> > >> > Of course if DTM can update while RegistrationResponse is processed then >> > somehow sorting would be >> > required and that case I would agree. >> > >> > - Scope DT updates by the RM ID and ensure that TM only accepts update >> from >> > the current leader >> > >> > I've planned this initially the mentioned way so agreed. >> > >> > - Return initial token with the RegistrationResponse, which should make >> the >> > RPC contract bit clearer (ensure that we can't register the TM without >> > providing it with token) >> > >> > I think this is a good idea in general. Not sure what you mean "we can't >> > register the TM without >> > providing it with token" but in unsecure configuration registration must >> > happen w/o tokens. >> > All in all the newly added tokens field must be somehow optional. >> > >> > G >> > >> > >> > On Fri, Jan 28, 2022 at 2:22 PM David Morávek <d...@apache.org> wrote: >> > >> > > We had a long discussion with Chesnay about the possible edge cases >> and >> > it >> > > basically boils down to the following two scenarios: >> > > >> > > 1) There is a possible race condition between TM registration (the >> first >> > DT >> > > update) and token refresh if they happen simultaneously. Than the >> > > registration might beat the refreshed token. This could be easily >> > addressed >> > > if DTs could be sorted (eg. by the expiration time) on the TM side. In >> > > other words, if there are multiple updates at the same time we need to >> > make >> > > sure that we have a deterministic way of choosing the latest one. >> > > >> > > One idea by Chesnay that popped up during this discussion was whether >> we >> > > could simply return the initial token with the RegistrationResponse to >> > > avoid making an extra call during the TM registration. >> > > >> > > 2) When the RM leadership changes (eg. because zookeeper session times >> > out) >> > > there might be a race condition where the old RM is shutting down and >> > > updates the tokens, that it might again beat the registration token of >> > the >> > > new RM. This could be avoided if we scope the token by >> > _ResourceManagerId_ >> > > and only accept updates for the current leader (basically we'd have an >> > > extra parameter to the _updateDelegationToken_ method). >> > > >> > > - >> > > >> > > DTM is way simpler then for example slot management, which could >> receive >> > > updates from the JobMaster that RM might not know about. >> > > >> > > So if you want to go in the path you're describing it should be doable >> > and >> > > we'd propose following to cover all cases: >> > > >> > > - Make sure DTs issued by single DTMs are monotonically increasing >> (can >> > be >> > > sorted on TM side) >> > > - Scope DT updates by the RM ID and ensure that TM only accepts update >> > from >> > > the current leader >> > > - Return initial token with the RegistrationResponse, which should >> make >> > the >> > > RPC contract bit clearer (ensure that we can't register the TM without >> > > providing it with token) >> > > >> > > Any thoughts? >> > > >> > > >> > > On Fri, Jan 28, 2022 at 10:53 AM Gabor Somogyi < >> > gabor.g.somo...@gmail.com> >> > > wrote: >> > > >> > > > Thanks for investing your time! >> > > > >> > > > The first 2 bulletpoint are clear. >> > > > If there is a chance that a TM can go to an inconsistent state then >> I >> > > agree >> > > > with the 3rd bulletpoint. >> > > > Just before we agree on that I would like to learn something new and >> > > > understand how is it possible that a TM >> > > > gets corrupted? (In Spark I've never seen such thing and no >> mechanism >> > to >> > > > fix this but Flink is definitely not Spark) >> > > > >> > > > Here is my understanding: >> > > > * DTM pushes new obtained DTs to TMs and if any exception occurs >> then a >> > > > retry after "security.kerberos.tokens.retry-wait" >> > > > happens. This means DTM retries until it's not possible to send new >> DTs >> > > to >> > > > all registered TMs. >> > > > * New TM registration must fail if "updateDelegationToken" fails >> > > > * "updateDelegationToken" fails consistently like a DB (at least I >> plan >> > > to >> > > > implement it that way). >> > > > If DTs are arriving on the TM side then a single >> > > > "UserGroupInformation.getCurrentUser.addCredentials" >> > > > will be called which I've never seen it failed. >> > > > * I hope all other code parts are not touching existing DTs within >> the >> > > JVM >> > > > >> > > > I would like to emphasize I'm not against to add it just want to see >> > what >> > > > kind of problems are we facing. >> > > > It would ease to catch bugs earlier and help in the maintenance. >> > > > >> > > > All in all I would buy the idea to add the 3rd bullet if we foresee >> the >> > > > need. >> > > > >> > > > G >> > > > >> > > > >> > > > On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org> >> > wrote: >> > > > >> > > > > Hi Gabor, >> > > > > >> > > > > This is definitely headed in a right direction +1. >> > > > > >> > > > > I think we still need to have a safeguard in case some of the TMs >> > gets >> > > > into >> > > > > the inconsistent state though, which will also eliminate the need >> for >> > > > > implementing a custom retry mechanism (when >> _updateDelegationToken_ >> > > call >> > > > > fails for some reason). >> > > > > >> > > > > We already have this safeguard in place for slot pool (in case >> there >> > > are >> > > > > some slots in inconsistent state - eg. we haven't freed them for >> some >> > > > > reason) and for the partition tracker, which could be simply >> > enhanced. >> > > > This >> > > > > is done via periodic heartbeat from TaskManagers to the >> > ResourceManager >> > > > > that contains report about state of these two components (from TM >> > > > > perspective) so the RM can reconcile their state if necessary. >> > > > > >> > > > > I don't think adding an additional field to >> > > > _TaskExecutorHeartbeatPayload_ >> > > > > should be a concern as we only heartbeat every ~ 10s by default >> and >> > the >> > > > new >> > > > > field would be small compared to rest of the existing payload. >> Also >> > > > > heartbeat doesn't need to contain the whole DT, but just some >> > > identifier >> > > > > which signals whether it uses the right one, that could be >> > > significantly >> > > > > smaller. >> > > > > >> > > > > This is still a PUSH based approach as the RM would again call the >> > > newly >> > > > > introduced _updateDelegationToken_ when it encounters >> inconsistency >> > > (eg. >> > > > > due to a temporary network partition / a race condition we didn't >> > test >> > > > for >> > > > > / some other scenario we didn't think about). In practice these >> > > > > inconsistencies are super hard to avoid and reason about (and >> > > > unfortunately >> > > > > yes, we see them happen from time to time), so reusing the >> existing >> > > > > mechanism that is designed for this exact problem simplify things. >> > > > > >> > > > > To sum this up we'd have three code paths for calling >> > > > > _updateDelegationToken_: >> > > > > 1) When the TM registers, we push the token (if DTM already has >> it) >> > to >> > > it >> > > > > 2) When DTM obtains a new token it broadcasts it to all currently >> > > > connected >> > > > > TMs >> > > > > 3) When a TM gets out of sync, DTM would reconcile it's state >> > > > > >> > > > > WDYT? >> > > > > >> > > > > Best, >> > > > > D. >> > > > > >> > > > > >> > > > > On Wed, Jan 26, 2022 at 9:03 PM David Morávek <d...@apache.org> >> > wrote: >> > > > > >> > > > > > Thanks the update, I'll go over it tomorrow. >> > > > > > >> > > > > > On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi < >> > > > gabor.g.somo...@gmail.com >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > >> Hi All, >> > > > > >> >> > > > > >> Since it has turned out that DTM can't be added as member of >> > > JobMaster >> > > > > >> < >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > > > >> > >> > > > > >> I've >> > > > > >> came up with a better proposal. >> > > > > >> David, thanks for pinpointing this out, you've caught a bug in >> the >> > > > early >> > > > > >> phase! >> > > > > >> >> > > > > >> Namely ResourceManager >> > > > > >> < >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124 >> > > > > >> > >> > > > > >> is >> > > > > >> a single instance class where DTM can be added as member >> variable. >> > > > > >> It has a list of all already registered TMs and new TM >> > registration >> > > is >> > > > > >> also >> > > > > >> happening here. >> > > > > >> The following can be added from logic perspective to be more >> > > specific: >> > > > > >> * Create new DTM instance in ResourceManager >> > > > > >> < >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124 >> > > > > >> > >> > > > > >> and >> > > > > >> start it (re-occurring thread to obtain new tokens) >> > > > > >> * Add a new function named "updateDelegationTokens" to >> > > > > TaskExecutorGateway >> > > > > >> < >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54 >> > > > > >> > >> > > > > >> * Call "updateDelegationTokens" on all registered TMs to >> propagate >> > > new >> > > > > DTs >> > > > > >> * In case of new TM registration call "updateDelegationTokens" >> > > before >> > > > > >> registration succeeds to setup new TM properly >> > > > > >> >> > > > > >> This way: >> > > > > >> * only a single DTM would live within a cluster which is the >> > > expected >> > > > > >> behavior >> > > > > >> * DTM is going to be added to a central place where all >> deployment >> > > > > target >> > > > > >> can make use of it >> > > > > >> * DTs are going to be pushed to TMs which would generate less >> > > network >> > > > > >> traffic than pull based approach >> > > > > >> (please see my previous mail where I've described both >> approaches) >> > > > > >> * HA scenario is going to be consistent because such >> > > > > >> < >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069 >> > > > > >> > >> > > > > >> a solution can be added to "updateDelegationTokens" >> > > > > >> >> > > > > >> @David or all others plz share whether you agree on this or you >> > have >> > > > > >> better >> > > > > >> idea/suggestion. >> > > > > >> >> > > > > >> BR, >> > > > > >> G >> > > > > >> >> > > > > >> >> > > > > >> On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi < >> > > > > gabor.g.somo...@gmail.com >> > > > > >> > >> > > > > >> wrote: >> > > > > >> >> > > > > >> > First of all thanks for investing your time and helping me >> out. >> > > As I >> > > > > see >> > > > > >> > you have pretty solid knowledge in the RPC area. >> > > > > >> > I would like to rely on your knowledge since I'm learning >> this >> > > part. >> > > > > >> > >> > > > > >> > > - Do we need to introduce a new RPC method or can we for >> > example >> > > > > >> > piggyback >> > > > > >> > on heartbeats? >> > > > > >> > >> > > > > >> > I'm fine with either solution but one thing is important >> > > > conceptually. >> > > > > >> > There are fundamentally 2 ways how tokens can be updated: >> > > > > >> > - Push way: When there are new DTs then JM JVM pushes DTs to >> TM >> > > > JVMs. >> > > > > >> This >> > > > > >> > is the preferred one since tiny amount of control logic >> needed. >> > > > > >> > - Pull way: Each time a TM would like to poll JM whether >> there >> > are >> > > > new >> > > > > >> > tokens and each TM wants to decide alone whether DTs needs >> to be >> > > > > >> updated or >> > > > > >> > not. >> > > > > >> > As you've mentioned here some ID needs to be generated, it >> would >> > > > > >> generated >> > > > > >> > quite some additional network traffic which can be definitely >> > > > avoided. >> > > > > >> > As a final thought in Spark we've had this way of DT >> propagation >> > > > logic >> > > > > >> and >> > > > > >> > we've had major issues with it. >> > > > > >> > >> > > > > >> > So all in all DTM needs to obtain new tokens and there must a >> > way >> > > to >> > > > > >> send >> > > > > >> > this data to all TMs from JM. >> > > > > >> > >> > > > > >> > > - What delivery semantics are we looking for? (what if >> we're >> > > only >> > > > > >> able to >> > > > > >> > update subset of TMs / what happens if we exhaust retries / >> > should >> > > > we >> > > > > >> even >> > > > > >> > have the retry mechanism whatsoever) - I have a feeling that >> > > somehow >> > > > > >> > leveraging the existing heartbeat mechanism could help to >> answer >> > > > these >> > > > > >> > questions >> > > > > >> > >> > > > > >> > Let's go through these questions one by one. >> > > > > >> > > What delivery semantics are we looking for? >> > > > > >> > >> > > > > >> > DTM must receive an exception when at least one TM was not >> able >> > to >> > > > get >> > > > > >> DTs. >> > > > > >> > >> > > > > >> > > what if we're only able to update subset of TMs? >> > > > > >> > >> > > > > >> > Such case DTM will reschedule token obtain after >> > > > > >> > "security.kerberos.tokens.retry-wait" time. >> > > > > >> > >> > > > > >> > > what happens if we exhaust retries? >> > > > > >> > >> > > > > >> > There is no number of retries. In default configuration >> tokens >> > > needs >> > > > > to >> > > > > >> be >> > > > > >> > re-obtained after one day. >> > > > > >> > DTM tries to obtain new tokens after 1day * 0.75 >> > > > > >> > (security.kerberos.tokens.renewal-ratio) = 18 hours. >> > > > > >> > When fails it retries after >> > "security.kerberos.tokens.retry-wait" >> > > > > which >> > > > > >> is >> > > > > >> > 1 hour by default. >> > > > > >> > If it never succeeds then authentication error is going to >> > happen >> > > on >> > > > > the >> > > > > >> > TM side and the workload is >> > > > > >> > going to stop. >> > > > > >> > >> > > > > >> > > should we even have the retry mechanism whatsoever? >> > > > > >> > >> > > > > >> > Yes, because there are always temporary cluster issues. >> > > > > >> > >> > > > > >> > > What does it mean for the running application (how does >> this >> > > look >> > > > > like >> > > > > >> > from >> > > > > >> > the user perspective)? As far as I remember the logs are only >> > > > > collected >> > > > > >> > ("aggregated") after the container is stopped, is that >> correct? >> > > > > >> > >> > > > > >> > With default config it works like that but it can be forced >> to >> > > > > aggregate >> > > > > >> > at specific intervals. >> > > > > >> > A useful feature is forcing YARN to aggregate logs while the >> job >> > > is >> > > > > >> still >> > > > > >> > running. >> > > > > >> > For long-running jobs such as streaming jobs, this is >> > invaluable. >> > > To >> > > > > do >> > > > > >> > this, >> > > > > >> > >> > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds >> > > > must >> > > > > >> be >> > > > > >> > set to a non-negative value. >> > > > > >> > When this is set, a timer will be set for the given duration, >> > and >> > > > > >> whenever >> > > > > >> > that timer goes off, >> > > > > >> > log aggregation will run on new files. >> > > > > >> > >> > > > > >> > > I think >> > > > > >> > this topic should get its own section in the FLIP (having >> some >> > > cross >> > > > > >> > reference to YARN ticket would be really useful, but I'm not >> > sure >> > > if >> > > > > >> there >> > > > > >> > are any). >> > > > > >> > >> > > > > >> > I think this is important knowledge but this FLIP is not >> > touching >> > > > the >> > > > > >> > already existing behavior. >> > > > > >> > DTs are set on the AM container which is renewed by YARN >> until >> > > it's >> > > > > not >> > > > > >> > possible anymore. >> > > > > >> > Any kind of new code is not going to change this limitation. >> > BTW, >> > > > > there >> > > > > >> is >> > > > > >> > no jira for this. >> > > > > >> > If you think it worth to write this down then I think the >> good >> > > place >> > > > > is >> > > > > >> > the official security doc >> > > > > >> > area as caveat. >> > > > > >> > >> > > > > >> > > If we split the FLIP into two parts / sections that I've >> > > > suggested, >> > > > > I >> > > > > >> > don't >> > > > > >> > really think that you need to explicitly test for each >> > deployment >> > > > > >> scenario >> > > > > >> > / cluster framework, because the DTM part is completely >> > > independent >> > > > of >> > > > > >> the >> > > > > >> > deployment target. Basically this is what I'm aiming for with >> > > > "making >> > > > > it >> > > > > >> > work with the standalone" (as simple as starting a new java >> > > process) >> > > > > >> Flink >> > > > > >> > first (which is also how most people deploy streaming >> > application >> > > on >> > > > > k8s >> > > > > >> > and the direction we're pushing forward with the >> auto-scaling / >> > > > > reactive >> > > > > >> > mode initiatives). >> > > > > >> > >> > > > > >> > I see your point and agree the main direction. k8s is the >> > > megatrend >> > > > > >> which >> > > > > >> > most of the peoples >> > > > > >> > will use sooner or later. Not 100% sure what kind of split >> you >> > > > suggest >> > > > > >> but >> > > > > >> > in my view >> > > > > >> > the main target is to add this feature and I'm open to any >> > logical >> > > > > work >> > > > > >> > ordering. >> > > > > >> > Please share the specific details and we work it out... >> > > > > >> > >> > > > > >> > G >> > > > > >> > >> > > > > >> > >> > > > > >> > On Mon, Jan 24, 2022 at 3:04 PM David Morávek < >> d...@apache.org> >> > > > > wrote: >> > > > > >> > >> > > > > >> >> > >> > > > > >> >> > Could you point to a code where you think it could be >> added >> > > > > exactly? >> > > > > >> A >> > > > > >> >> > helping hand is welcome here 🙂 >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> I think you can take a look at >> > _ResourceManagerPartitionTracker_ >> > > > [1] >> > > > > >> which >> > > > > >> >> seems to have somewhat similar properties to the DTM. >> > > > > >> >> >> > > > > >> >> One topic that needs to be addressed there is how the RPC >> with >> > > the >> > > > > >> >> _TaskExecutorGateway_ should look like. >> > > > > >> >> - Do we need to introduce a new RPC method or can we for >> > example >> > > > > >> piggyback >> > > > > >> >> on heartbeats? >> > > > > >> >> - What delivery semantics are we looking for? (what if we're >> > only >> > > > > able >> > > > > >> to >> > > > > >> >> update subset of TMs / what happens if we exhaust retries / >> > > should >> > > > we >> > > > > >> even >> > > > > >> >> have the retry mechanism whatsoever) - I have a feeling that >> > > > somehow >> > > > > >> >> leveraging the existing heartbeat mechanism could help to >> > answer >> > > > > these >> > > > > >> >> questions >> > > > > >> >> >> > > > > >> >> In short, after DT reaches it's max lifetime then log >> > aggregation >> > > > > stops >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> What does it mean for the running application (how does this >> > look >> > > > > like >> > > > > >> >> from >> > > > > >> >> the user perspective)? As far as I remember the logs are >> only >> > > > > collected >> > > > > >> >> ("aggregated") after the container is stopped, is that >> > correct? I >> > > > > think >> > > > > >> >> this topic should get its own section in the FLIP (having >> some >> > > > cross >> > > > > >> >> reference to YARN ticket would be really useful, but I'm not >> > sure >> > > > if >> > > > > >> there >> > > > > >> >> are any). >> > > > > >> >> >> > > > > >> >> All deployment modes (per-job, per-app, ...) are planned to >> be >> > > > tested >> > > > > >> and >> > > > > >> >> > expect to work with the initial implementation however not >> > all >> > > > > >> >> deployment >> > > > > >> >> > targets (k8s, local, ... >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> If we split the FLIP into two parts / sections that I've >> > > > suggested, I >> > > > > >> >> don't >> > > > > >> >> really think that you need to explicitly test for each >> > deployment >> > > > > >> scenario >> > > > > >> >> / cluster framework, because the DTM part is completely >> > > independent >> > > > > of >> > > > > >> the >> > > > > >> >> deployment target. Basically this is what I'm aiming for >> with >> > > > "making >> > > > > >> it >> > > > > >> >> work with the standalone" (as simple as starting a new java >> > > > process) >> > > > > >> Flink >> > > > > >> >> first (which is also how most people deploy streaming >> > application >> > > > on >> > > > > >> k8s >> > > > > >> >> and the direction we're pushing forward with the >> auto-scaling / >> > > > > >> reactive >> > > > > >> >> mode initiatives). >> > > > > >> >> >> > > > > >> >> The whole integration with YARN (let's forget about log >> > > aggregation >> > > > > >> for a >> > > > > >> >> moment) / k8s-native only boils down to how do we make the >> > keytab >> > > > > file >> > > > > >> >> local to the JobManager so the DTM can read it, so it's >> > basically >> > > > > >> built on >> > > > > >> >> top of that. The only special thing that needs to be tested >> > there >> > > > is >> > > > > >> the >> > > > > >> >> "keytab distribution" code path. >> > > > > >> >> >> > > > > >> >> [1] >> > > > > >> >> >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java >> > > > > >> >> >> > > > > >> >> Best, >> > > > > >> >> D. >> > > > > >> >> >> > > > > >> >> On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi < >> > > > > >> gabor.g.somo...@gmail.com >> > > > > >> >> > >> > > > > >> >> wrote: >> > > > > >> >> >> > > > > >> >> > > There is a separate JobMaster for each job >> > > > > >> >> > within a Flink cluster and each JobMaster only has a >> partial >> > > view >> > > > > of >> > > > > >> the >> > > > > >> >> > task managers >> > > > > >> >> > >> > > > > >> >> > Good point! I've had a deeper look and you're right. We >> > > > definitely >> > > > > >> need >> > > > > >> >> to >> > > > > >> >> > find another place. >> > > > > >> >> > >> > > > > >> >> > > Related per-cluster or per-job keytab: >> > > > > >> >> > >> > > > > >> >> > In the current code per-cluster keytab is implemented and >> I'm >> > > > > >> intended >> > > > > >> >> to >> > > > > >> >> > keep it like this within this FLIP. The reason is simple: >> > > tokens >> > > > on >> > > > > >> TM >> > > > > >> >> side >> > > > > >> >> > can be stored within the UserGroupInformation (UGI) >> structure >> > > > which >> > > > > >> is >> > > > > >> >> > global. I'm not telling it's impossible to change that >> but I >> > > > think >> > > > > >> that >> > > > > >> >> > this is such a complexity which the initial >> implementation is >> > > not >> > > > > >> >> required >> > > > > >> >> > to contain. Additionally we've not seen such need from >> user >> > > side. >> > > > > If >> > > > > >> the >> > > > > >> >> > need may rise later on then another FLIP with this topic >> can >> > be >> > > > > >> created >> > > > > >> >> and >> > > > > >> >> > discussed. Proper multi-UGI handling within a single JVM >> is a >> > > > topic >> > > > > >> >> where >> > > > > >> >> > several round of deep-dive with the Hadoop/YARN guys are >> > > > required. >> > > > > >> >> > >> > > > > >> >> > > single DTM instance embedded with >> > > > > >> >> > the ResourceManager (the Flink component) >> > > > > >> >> > >> > > > > >> >> > Could you point to a code where you think it could be >> added >> > > > > exactly? >> > > > > >> A >> > > > > >> >> > helping hand is welcome here🙂 >> > > > > >> >> > >> > > > > >> >> > > Then the single (initial) implementation should work >> with >> > all >> > > > the >> > > > > >> >> > deployments modes out of the box (which is not what the >> FLIP >> > > > > >> suggests). >> > > > > >> >> Is >> > > > > >> >> > that correct? >> > > > > >> >> > >> > > > > >> >> > All deployment modes (per-job, per-app, ...) are planned >> to >> > be >> > > > > tested >> > > > > >> >> and >> > > > > >> >> > expect to work with the initial implementation however not >> > all >> > > > > >> >> deployment >> > > > > >> >> > targets (k8s, local, ...) are not intended to be tested. >> Per >> > > > > >> deployment >> > > > > >> >> > target new jira needs to be created where I expect small >> > number >> > > > of >> > > > > >> codes >> > > > > >> >> > needs to be added and relatively expensive testing effort >> is >> > > > > >> required. >> > > > > >> >> > >> > > > > >> >> > > I've taken a look into the prototype and in the >> > > > > >> >> "YarnClusterDescriptor" >> > > > > >> >> > you're injecting a delegation token into the AM [1] >> (that's >> > > > > obtained >> > > > > >> >> using >> > > > > >> >> > the provided keytab). If I understand this correctly from >> > > > previous >> > > > > >> >> > discussion / FLIP, this is to support log aggregation and >> DT >> > > has >> > > > a >> > > > > >> >> limited >> > > > > >> >> > validity. How is this DT going to be renewed? >> > > > > >> >> > >> > > > > >> >> > You're clever and touched a limitation which Spark has >> too. >> > In >> > > > > short, >> > > > > >> >> after >> > > > > >> >> > DT reaches it's max lifetime then log aggregation stops. >> I've >> > > had >> > > > > >> >> several >> > > > > >> >> > deep-dive rounds with the YARN guys at Spark years because >> > > wanted >> > > > > to >> > > > > >> >> fill >> > > > > >> >> > this gap. They can't provide us any way to re-inject the >> > newly >> > > > > >> obtained >> > > > > >> >> DT >> > > > > >> >> > so at the end I gave up this. >> > > > > >> >> > >> > > > > >> >> > BR, >> > > > > >> >> > G >> > > > > >> >> > >> > > > > >> >> > >> > > > > >> >> > On Mon, 24 Jan 2022, 11:00 David Morávek, < >> d...@apache.org> >> > > > wrote: >> > > > > >> >> > >> > > > > >> >> > > Hi Gabor, >> > > > > >> >> > > >> > > > > >> >> > > There is actually a huge difference between JobManager >> > > > (process) >> > > > > >> and >> > > > > >> >> > > JobMaster (job coordinator). The naming is unfortunately >> > bit >> > > > > >> >> misleading >> > > > > >> >> > > here from historical reasons. There is a separate >> JobMaster >> > > for >> > > > > >> each >> > > > > >> >> job >> > > > > >> >> > > within a Flink cluster and each JobMaster only has a >> > partial >> > > > view >> > > > > >> of >> > > > > >> >> the >> > > > > >> >> > > task managers (depends on where the slots for a >> particular >> > > job >> > > > > are >> > > > > >> >> > > allocated). This means that you'll end up with N >> > > > > >> >> > "DelegationTokenManagers" >> > > > > >> >> > > competing with each other (N = number of running jobs in >> > the >> > > > > >> cluster). >> > > > > >> >> > > >> > > > > >> >> > > This makes me think we're mixing two abstraction levels >> > here: >> > > > > >> >> > > >> > > > > >> >> > > a) Per-cluster delegation tokens >> > > > > >> >> > > - Simpler approach, it would involve a single DTM >> instance >> > > > > embedded >> > > > > >> >> with >> > > > > >> >> > > the ResourceManager (the Flink component) >> > > > > >> >> > > b) Per-job delegation tokens >> > > > > >> >> > > - More complex approach, but could be more flexible from >> > the >> > > > user >> > > > > >> >> side of >> > > > > >> >> > > things. >> > > > > >> >> > > - Multiple DTM instances, that are bound with the >> JobMaster >> > > > > >> lifecycle. >> > > > > >> >> > > Delegation tokens are attached with a particular slots >> that >> > > are >> > > > > >> >> executing >> > > > > >> >> > > the job tasks instead of the whole task manager (TM >> could >> > be >> > > > > >> executing >> > > > > >> >> > > multiple jobs with different tokens). >> > > > > >> >> > > - The question is which keytab should be used for the >> > > > clustering >> > > > > >> >> > framework, >> > > > > >> >> > > to support log aggregation on YARN (an extra keytab, >> keytab >> > > > that >> > > > > >> comes >> > > > > >> >> > with >> > > > > >> >> > > the first job?) >> > > > > >> >> > > >> > > > > >> >> > > I think these are the things that need to be clarified >> in >> > the >> > > > > FLIP >> > > > > >> >> before >> > > > > >> >> > > proceeding. >> > > > > >> >> > > >> > > > > >> >> > > A follow-up question for getting a better understanding >> > where >> > > > > this >> > > > > >> >> should >> > > > > >> >> > > be headed: Are there any use cases where user may want >> to >> > use >> > > > > >> >> different >> > > > > >> >> > > keytabs with each job, or are we fine with using a >> > > cluster-wide >> > > > > >> >> keytab? >> > > > > >> >> > If >> > > > > >> >> > > we go with per-cluster keytabs, is it OK that all jobs >> > > > submitted >> > > > > >> into >> > > > > >> >> > this >> > > > > >> >> > > cluster can access it (even the future ones)? Should >> this >> > be >> > > a >> > > > > >> >> security >> > > > > >> >> > > concern? >> > > > > >> >> > > >> > > > > >> >> > > Presume you though I would implement a new class with >> > > > JobManager >> > > > > >> name. >> > > > > >> >> > The >> > > > > >> >> > > > plan is not that. >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > > I've never suggested such thing. >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > > No. That said earlier DT handling is planned to be >> done >> > > > > >> completely >> > > > > >> >> in >> > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains >> tokens >> > in >> > > > the >> > > > > >> >> proper >> > > > > >> >> > > time >> > > > > >> >> > > > when needed. >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > > Then the single (initial) implementation should work >> with >> > all >> > > > the >> > > > > >> >> > > deployments modes out of the box (which is not what the >> > FLIP >> > > > > >> >> suggests). >> > > > > >> >> > Is >> > > > > >> >> > > that correct? >> > > > > >> >> > > >> > > > > >> >> > > If the cluster framework, also requires delegation token >> > for >> > > > > their >> > > > > >> >> inner >> > > > > >> >> > > working (this is IMO only applies to YARN), it might >> need >> > an >> > > > > extra >> > > > > >> >> step >> > > > > >> >> > > (injecting the token into application master container). >> > > > > >> >> > > >> > > > > >> >> > > Separating the individual layers (actual Flink cluster - >> > > > > basically >> > > > > >> >> making >> > > > > >> >> > > this work with a standalone deployment / "cluster >> > > framework" - >> > > > > >> >> support >> > > > > >> >> > for >> > > > > >> >> > > YARN log aggregation) in the FLIP would be useful. >> > > > > >> >> > > >> > > > > >> >> > > Reading the linked Spark readme could be useful. >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > > I've read that, but please be patient with the >> questions, >> > > > > Kerberos >> > > > > >> is >> > > > > >> >> not >> > > > > >> >> > > an easy topic to get into and I've had a very little >> > contact >> > > > with >> > > > > >> it >> > > > > >> >> in >> > > > > >> >> > the >> > > > > >> >> > > past. >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > > I've taken a look into the prototype and in the >> > > > > >> >> "YarnClusterDescriptor" >> > > > > >> >> > > you're injecting a delegation token into the AM [1] >> (that's >> > > > > >> obtained >> > > > > >> >> > using >> > > > > >> >> > > the provided keytab). If I understand this correctly >> from >> > > > > previous >> > > > > >> >> > > discussion / FLIP, this is to support log aggregation >> and >> > DT >> > > > has >> > > > > a >> > > > > >> >> > limited >> > > > > >> >> > > validity. How is this DT going to be renewed? >> > > > > >> >> > > >> > > > > >> >> > > [1] >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261 >> > > > > >> >> > > >> > > > > >> >> > > Best, >> > > > > >> >> > > D. >> > > > > >> >> > > >> > > > > >> >> > > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi < >> > > > > >> >> gabor.g.somo...@gmail.com >> > > > > >> >> > > >> > > > > >> >> > > wrote: >> > > > > >> >> > > >> > > > > >> >> > > > Here is the exact class, I'm from mobile so not had a >> > look >> > > at >> > > > > the >> > > > > >> >> exact >> > > > > >> >> > > > class name: >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > > > >> >> > > > That keeps track of TMs where the tokens can be sent >> to. >> > > > > >> >> > > > >> > > > > >> >> > > > > My feeling would be that we shouldn't really >> introduce >> > a >> > > > new >> > > > > >> >> > component >> > > > > >> >> > > > with >> > > > > >> >> > > > a custom lifecycle, but rather we should try to >> > incorporate >> > > > > this >> > > > > >> >> into >> > > > > >> >> > > > existing ones. >> > > > > >> >> > > > >> > > > > >> >> > > > Can you be more specific? Presume you though I would >> > > > implement >> > > > > a >> > > > > >> new >> > > > > >> >> > > class >> > > > > >> >> > > > with JobManager name. The plan is not that. >> > > > > >> >> > > > >> > > > > >> >> > > > > If I understand this correctly, this means that we >> then >> > > > push >> > > > > >> the >> > > > > >> >> > token >> > > > > >> >> > > > renewal logic to YARN. >> > > > > >> >> > > > >> > > > > >> >> > > > No. That said earlier DT handling is planned to be >> done >> > > > > >> completely >> > > > > >> >> in >> > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains >> tokens >> > in >> > > > the >> > > > > >> >> proper >> > > > > >> >> > > time >> > > > > >> >> > > > when needed. YARN log aggregation is a totally >> different >> > > > > feature, >> > > > > >> >> where >> > > > > >> >> > > > YARN does the renewal. Log aggregation was an example >> why >> > > the >> > > > > >> code >> > > > > >> >> > can't >> > > > > >> >> > > be >> > > > > >> >> > > > 100% reusable for all resource managers. Reading the >> > linked >> > > > > Spark >> > > > > >> >> > readme >> > > > > >> >> > > > could be useful. >> > > > > >> >> > > > >> > > > > >> >> > > > G >> > > > > >> >> > > > >> > > > > >> >> > > > On Fri, 21 Jan 2022, 21:05 David Morávek, < >> > d...@apache.org >> > > > >> > > > > >> wrote: >> > > > > >> >> > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > JobManager is the Flink class. >> > > > > >> >> > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > > There is no such class in Flink. The closest thing >> to >> > the >> > > > > >> >> JobManager >> > > > > >> >> > > is a >> > > > > >> >> > > > > ClusterEntrypoint. The cluster entrypoint spawns >> new RM >> > > > > Runner >> > > > > >> & >> > > > > >> >> > > > Dispatcher >> > > > > >> >> > > > > Runner that start participating in the leader >> election. >> > > > Once >> > > > > >> they >> > > > > >> >> > gain >> > > > > >> >> > > > > leadership they spawn the actual underlying >> instances >> > of >> > > > > these >> > > > > >> two >> > > > > >> >> > > "main >> > > > > >> >> > > > > components". >> > > > > >> >> > > > > >> > > > > >> >> > > > > My feeling would be that we shouldn't really >> introduce >> > a >> > > > new >> > > > > >> >> > component >> > > > > >> >> > > > with >> > > > > >> >> > > > > a custom lifecycle, but rather we should try to >> > > incorporate >> > > > > >> this >> > > > > >> >> into >> > > > > >> >> > > > > existing ones. >> > > > > >> >> > > > > >> > > > > >> >> > > > > My biggest concerns would be: >> > > > > >> >> > > > > >> > > > > >> >> > > > > - How would the lifecycle of the new component look >> > like >> > > > with >> > > > > >> >> regards >> > > > > >> >> > > to >> > > > > >> >> > > > HA >> > > > > >> >> > > > > setups. If we really try to decide to introduce a >> > > > completely >> > > > > >> new >> > > > > >> >> > > > component, >> > > > > >> >> > > > > how should this work in case of multiple JobManager >> > > > > instances? >> > > > > >> >> > > > > - Which components does it talk to / how? For >> example >> > how >> > > > > does >> > > > > >> the >> > > > > >> >> > > > > broadcast of new token to task managers >> > > > (TaskManagerGateway) >> > > > > >> look >> > > > > >> >> > like? >> > > > > >> >> > > > Do >> > > > > >> >> > > > > we simply introduce a new RPC on the >> > > ResourceManagerGateway >> > > > > >> that >> > > > > >> >> > > > broadcasts >> > > > > >> >> > > > > it or does the new component need to do some kind of >> > > > > >> bookkeeping >> > > > > >> >> of >> > > > > >> >> > > task >> > > > > >> >> > > > > managers that it needs to notify? >> > > > > >> >> > > > > >> > > > > >> >> > > > > YARN based HDFS log aggregation would not work by >> > > dropping >> > > > > that >> > > > > >> >> code. >> > > > > >> >> > > > Just >> > > > > >> >> > > > > > to be crystal clear, the actual implementation >> > contains >> > > > > this >> > > > > >> fir >> > > > > >> >> > > > exactly >> > > > > >> >> > > > > > this reason. >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > > This is the missing part +1. If I understand this >> > > > correctly, >> > > > > >> this >> > > > > >> >> > means >> > > > > >> >> > > > > that we then push the token renewal logic to YARN. >> How >> > do >> > > > you >> > > > > >> >> plan to >> > > > > >> >> > > > > implement the renewal logic on k8s? >> > > > > >> >> > > > > >> > > > > >> >> > > > > D. >> > > > > >> >> > > > > >> > > > > >> >> > > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi < >> > > > > >> >> > > gabor.g.somo...@gmail.com >> > > > > >> >> > > > > >> > > > > >> >> > > > > wrote: >> > > > > >> >> > > > > >> > > > > >> >> > > > > > > I think we might both mean something different >> by >> > the >> > > > RM. >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > You feel it well, I've not specified these terms >> well >> > > in >> > > > > the >> > > > > >> >> > > > explanation. >> > > > > >> >> > > > > > RM I meant resource management framework. >> JobManager >> > is >> > > > the >> > > > > >> >> Flink >> > > > > >> >> > > > class. >> > > > > >> >> > > > > > This means that inside JM instance there will be a >> > DTM >> > > > > >> >> instance, so >> > > > > >> >> > > > they >> > > > > >> >> > > > > > would have the same lifecycle. Hope I've answered >> the >> > > > > >> question. >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > > If we have tokens available on the client side, >> why >> > > do >> > > > we >> > > > > >> >> need to >> > > > > >> >> > > set >> > > > > >> >> > > > > > them >> > > > > >> >> > > > > > into the AM (yarn specific concept) launch >> context? >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > YARN based HDFS log aggregation would not work by >> > > > dropping >> > > > > >> that >> > > > > >> >> > code. >> > > > > >> >> > > > > Just >> > > > > >> >> > > > > > to be crystal clear, the actual implementation >> > contains >> > > > > this >> > > > > >> fir >> > > > > >> >> > > > exactly >> > > > > >> >> > > > > > this reason. >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > G >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, < >> > > > d...@apache.org >> > > > > > >> > > > > >> >> wrote: >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > > Hi Gabor, >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > 1. One thing is important, token management is >> > > planned >> > > > to >> > > > > >> be >> > > > > >> >> done >> > > > > >> >> > > > > > > > generically within Flink and not scattered in >> RM >> > > > > specific >> > > > > >> >> code. >> > > > > >> >> > > > > > > JobManager >> > > > > >> >> > > > > > > > has a DelegationTokenManager which obtains >> tokens >> > > > > >> >> time-to-time >> > > > > >> >> > > (if >> > > > > >> >> > > > > > > > configured properly). JM knows which >> TaskManagers >> > > are >> > > > > in >> > > > > >> >> place >> > > > > >> >> > so >> > > > > >> >> > > > it >> > > > > >> >> > > > > > can >> > > > > >> >> > > > > > > > distribute it to all TMs. That's it basically. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > I think we might both mean something different >> by >> > the >> > > > RM. >> > > > > >> >> > > JobManager >> > > > > >> >> > > > is >> > > > > >> >> > > > > > > basically just a process encapsulating multiple >> > > > > components, >> > > > > >> >> one >> > > > > >> >> > of >> > > > > >> >> > > > > which >> > > > > >> >> > > > > > is >> > > > > >> >> > > > > > > a ResourceManager, which is the component that >> > > manages >> > > > > task >> > > > > >> >> > manager >> > > > > >> >> > > > > > > registrations [1]. There is more or less a >> single >> > > > > >> >> implementation >> > > > > >> >> > of >> > > > > >> >> > > > the >> > > > > >> >> > > > > > RM >> > > > > >> >> > > > > > > with plugable drivers for the active >> integrations >> > > > (yarn, >> > > > > >> k8s). >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > It would be great if you could share more >> details >> > of >> > > > how >> > > > > >> >> exactly >> > > > > >> >> > > the >> > > > > >> >> > > > > DTM >> > > > > >> >> > > > > > is >> > > > > >> >> > > > > > > going to fit in the current JM architecture. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > 2. 99.9% of the code is generic but each RM >> handles >> > > > > tokens >> > > > > >> >> > > > > differently. A >> > > > > >> >> > > > > > > > good example is YARN obtains tokens on client >> > side >> > > > and >> > > > > >> then >> > > > > >> >> > sets >> > > > > >> >> > > > them >> > > > > >> >> > > > > > on >> > > > > >> >> > > > > > > > the newly created AM container launch context. >> > This >> > > > is >> > > > > >> >> purely >> > > > > >> >> > > YARN >> > > > > >> >> > > > > > > specific >> > > > > >> >> > > > > > > > and cant't be spared. With my actual plans >> > > standalone >> > > > > >> can be >> > > > > >> >> > > > changed >> > > > > >> >> > > > > to >> > > > > >> >> > > > > > > use >> > > > > >> >> > > > > > > > the framework. By using it I mean no RM >> specific >> > > DTM >> > > > or >> > > > > >> >> > > whatsoever >> > > > > >> >> > > > is >> > > > > >> >> > > > > > > > needed. >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > If we have tokens available on the client side, >> why >> > > do >> > > > we >> > > > > >> >> need to >> > > > > >> >> > > set >> > > > > >> >> > > > > > them >> > > > > >> >> > > > > > > into the AM (yarn specific concept) launch >> context? >> > > Why >> > > > > >> can't >> > > > > >> >> we >> > > > > >> >> > > > simply >> > > > > >> >> > > > > > > send them to the JM, eg. as a parameter of the >> job >> > > > > >> submission >> > > > > >> >> / >> > > > > >> >> > via >> > > > > >> >> > > > > > > separate RPC call? There might be something I'm >> > > missing >> > > > > >> due to >> > > > > >> >> > > > limited >> > > > > >> >> > > > > > > knowledge, but handling the token on the >> "cluster >> > > > > >> framework" >> > > > > >> >> > level >> > > > > >> >> > > > > > doesn't >> > > > > >> >> > > > > > > seem necessary. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > [1] >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > Best, >> > > > > >> >> > > > > > > D. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor Somogyi < >> > > > > >> >> > > > > gabor.g.somo...@gmail.com >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > wrote: >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > > Oh and one more thing. I'm planning to add >> this >> > > > feature >> > > > > >> in >> > > > > >> >> > small >> > > > > >> >> > > > > chunk >> > > > > >> >> > > > > > of >> > > > > >> >> > > > > > > > PRs because security is super hairy area. That >> > way >> > > > > >> reviewers >> > > > > >> >> > can >> > > > > >> >> > > be >> > > > > >> >> > > > > > more >> > > > > >> >> > > > > > > > easily obtains the concept. >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, < >> > > > > >> d...@apache.org> >> > > > > >> >> > > wrote: >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > > Hi Gabor, >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > thanks for drafting the FLIP, I think >> having a >> > > > solid >> > > > > >> >> Kerberos >> > > > > >> >> > > > > support >> > > > > >> >> > > > > > > is >> > > > > >> >> > > > > > > > > crucial for many enterprise deployments. >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > I have multiple questions regarding the >> > > > > implementation >> > > > > >> >> (note >> > > > > >> >> > > > that I >> > > > > >> >> > > > > > > have >> > > > > >> >> > > > > > > > > very limited knowledge of Kerberos): >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > 1) If I understand it correctly, we'll only >> > > obtain >> > > > > >> tokens >> > > > > >> >> in >> > > > > >> >> > > the >> > > > > >> >> > > > > job >> > > > > >> >> > > > > > > > > manager and then we'll distribute them via >> RPC >> > > > (needs >> > > > > >> to >> > > > > >> >> be >> > > > > >> >> > > > > secured). >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > Can you please outline how the communication >> > will >> > > > > look >> > > > > >> >> like? >> > > > > >> >> > Is >> > > > > >> >> > > > the >> > > > > >> >> > > > > > > > > DelegationTokenManager going to be a part of >> > the >> > > > > >> >> > > ResourceManager? >> > > > > >> >> > > > > Can >> > > > > >> >> > > > > > > you >> > > > > >> >> > > > > > > > > outline it's lifecycle / how it's going to >> be >> > > > > >> integrated >> > > > > >> >> > there? >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > 2) Do we really need a YARN / k8s specific >> > > > > >> >> implementations? >> > > > > >> >> > Is >> > > > > >> >> > > it >> > > > > >> >> > > > > > > > possible >> > > > > >> >> > > > > > > > > to obtain / renew a token in a generic way? >> > Maybe >> > > > to >> > > > > >> >> rephrase >> > > > > >> >> > > > that, >> > > > > >> >> > > > > > is >> > > > > >> >> > > > > > > it >> > > > > >> >> > > > > > > > > possible to implement DelegationTokenManager >> > for >> > > > the >> > > > > >> >> > standalone >> > > > > >> >> > > > > > Flink? >> > > > > >> >> > > > > > > If >> > > > > >> >> > > > > > > > > we're able to solve this point, it could be >> > > > possible >> > > > > to >> > > > > >> >> > target >> > > > > >> >> > > > all >> > > > > >> >> > > > > > > > > deployment scenarios with a single >> > > implementation. >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > Best, >> > > > > >> >> > > > > > > > > D. >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan >> Zhang < >> > > > > >> >> > > > > > zuston.sha...@gmail.com> >> > > > > >> >> > > > > > > > > wrote: >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > > Hi G >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > Thanks for your explain in detail. I have >> > > gotten >> > > > > your >> > > > > >> >> > > thoughts, >> > > > > >> >> > > > > and >> > > > > >> >> > > > > > > any >> > > > > >> >> > > > > > > > > > way this proposal >> > > > > >> >> > > > > > > > > > is a great improvement. >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > Looking forward to your implementation >> and i >> > > will >> > > > > >> keep >> > > > > >> >> > focus >> > > > > >> >> > > on >> > > > > >> >> > > > > it. >> > > > > >> >> > > > > > > > > > Thanks again. >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > Best >> > > > > >> >> > > > > > > > > > JunFan. >> > > > > >> >> > > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor >> > Somogyi < >> > > > > >> >> > > > > > > > gabor.g.somo...@gmail.com >> > > > > >> >> > > > > > > > > >, >> > > > > >> >> > > > > > > > > > wrote: >> > > > > >> >> > > > > > > > > > > Just to confirm keeping >> > > > > >> >> > > > > > "security.kerberos.fetch.delegation-token" >> > > > > >> >> > > > > > > is >> > > > > >> >> > > > > > > > > > added >> > > > > >> >> > > > > > > > > > > to the doc. >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > BR, >> > > > > >> >> > > > > > > > > > > G >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor >> > > Somogyi < >> > > > > >> >> > > > > > > > > gabor.g.somo...@gmail.com >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > wrote: >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > Hi JunFan, >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > By the way, maybe this should be >> added >> > in >> > > > the >> > > > > >> >> > migration >> > > > > >> >> > > > > plan >> > > > > >> >> > > > > > or >> > > > > >> >> > > > > > > > > > > > intergation section in the FLIP-211. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > Going to add this soon. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Besides, I have a question that the >> KDC >> > > > will >> > > > > >> >> collapse >> > > > > >> >> > > > when >> > > > > >> >> > > > > > the >> > > > > >> >> > > > > > > > > > cluster >> > > > > >> >> > > > > > > > > > > > reached 200 nodes you described >> > > > > >> >> > > > > > > > > > > > in the google doc. Do you have any >> > > attachment >> > > > > or >> > > > > >> >> > > reference >> > > > > >> >> > > > to >> > > > > >> >> > > > > > > prove >> > > > > >> >> > > > > > > > > it? >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > "KDC *may* collapse under some >> > > circumstances" >> > > > > is >> > > > > >> the >> > > > > >> >> > > proper >> > > > > >> >> > > > > > > > wording. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > We have several customers who are >> > executing >> > > > > >> >> workloads >> > > > > >> >> > on >> > > > > >> >> > > > > > > > Spark/Flink. >> > > > > >> >> > > > > > > > > > Most >> > > > > >> >> > > > > > > > > > > > of the time I'm facing their >> > > > > >> >> > > > > > > > > > > > daily issues which is heavily >> environment >> > > and >> > > > > >> >> use-case >> > > > > >> >> > > > > > dependent. >> > > > > >> >> > > > > > > > > I've >> > > > > >> >> > > > > > > > > > > > seen various cases: >> > > > > >> >> > > > > > > > > > > > * where the mentioned ~1k nodes were >> > > working >> > > > > fine >> > > > > >> >> > > > > > > > > > > > * where KDC thought the number of >> > requests >> > > > are >> > > > > >> >> coming >> > > > > >> >> > > from >> > > > > >> >> > > > > DDOS >> > > > > >> >> > > > > > > > > attack >> > > > > >> >> > > > > > > > > > so >> > > > > >> >> > > > > > > > > > > > discontinued authentication >> > > > > >> >> > > > > > > > > > > > * where KDC was simply not responding >> > > because >> > > > > of >> > > > > >> the >> > > > > >> >> > load >> > > > > >> >> > > > > > > > > > > > * where KDC was intermittently had >> some >> > > > outage >> > > > > >> (this >> > > > > >> >> > was >> > > > > >> >> > > > the >> > > > > >> >> > > > > > most >> > > > > >> >> > > > > > > > > nasty >> > > > > >> >> > > > > > > > > > > > thing) >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > Since you're managing relatively big >> > > cluster >> > > > > then >> > > > > >> >> you >> > > > > >> >> > > know >> > > > > >> >> > > > > that >> > > > > >> >> > > > > > > KDC >> > > > > >> >> > > > > > > > > is >> > > > > >> >> > > > > > > > > > not >> > > > > >> >> > > > > > > > > > > > only used by Spark/Flink workloads >> > > > > >> >> > > > > > > > > > > > but the whole company IT >> infrastructure >> > is >> > > > > >> bombing >> > > > > >> >> it >> > > > > >> >> > so >> > > > > >> >> > > it >> > > > > >> >> > > > > > > really >> > > > > >> >> > > > > > > > > > depends >> > > > > >> >> > > > > > > > > > > > on other factors too whether KDC is >> > > reaching >> > > > > >> >> > > > > > > > > > > > it's limit or not. Not sure what kind >> of >> > > > > evidence >> > > > > >> >> are >> > > > > >> >> > you >> > > > > >> >> > > > > > looking >> > > > > >> >> > > > > > > > for >> > > > > >> >> > > > > > > > > > but >> > > > > >> >> > > > > > > > > > > > I'm not authorized to share any >> > information >> > > > > about >> > > > > >> >> > > > > > > > > > > > our clients data. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > One thing is for sure. The more >> external >> > > > system >> > > > > >> >> types >> > > > > >> >> > are >> > > > > >> >> > > > > used >> > > > > >> >> > > > > > in >> > > > > >> >> > > > > > > > > > > > workloads (for ex. HDFS, HBase, Hive, >> > > Kafka) >> > > > > >> which >> > > > > >> >> > > > > > > > > > > > are authenticating through KDC the >> more >> > > > > >> possibility >> > > > > >> >> to >> > > > > >> >> > > > reach >> > > > > >> >> > > > > > this >> > > > > >> >> > > > > > > > > > > > threshold when the cluster is big >> enough. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > All in all this feature is here to >> help >> > all >> > > > > users >> > > > > >> >> never >> > > > > >> >> > > > reach >> > > > > >> >> > > > > > > this >> > > > > >> >> > > > > > > > > > > > limitation. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > BR, >> > > > > >> >> > > > > > > > > > > > G >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆 < >> > > > > >> >> > > > zuston.sha...@gmail.com >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > > > wrote: >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Hi G >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Thanks for your quick reply. I think >> > > > > reserving >> > > > > >> the >> > > > > >> >> > > config >> > > > > >> >> > > > > of >> > > > > >> >> > > > > > > > > > > > > >> > > *security.kerberos.fetch.delegation-token* >> > > > > >> >> > > > > > > > > > > > > and simplifying disable the token >> > > fetching >> > > > > is a >> > > > > >> >> good >> > > > > >> >> > > > > idea.By >> > > > > >> >> > > > > > > the >> > > > > >> >> > > > > > > > > way, >> > > > > >> >> > > > > > > > > > > > > maybe this should be added >> > > > > >> >> > > > > > > > > > > > > in the migration plan or intergation >> > > > section >> > > > > in >> > > > > >> >> the >> > > > > >> >> > > > > FLIP-211. >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Besides, I have a question that the >> KDC >> > > > will >> > > > > >> >> collapse >> > > > > >> >> > > > when >> > > > > >> >> > > > > > the >> > > > > >> >> > > > > > > > > > cluster >> > > > > >> >> > > > > > > > > > > > > reached 200 nodes you described >> > > > > >> >> > > > > > > > > > > > > in the google doc. Do you have any >> > > > attachment >> > > > > >> or >> > > > > >> >> > > > reference >> > > > > >> >> > > > > to >> > > > > >> >> > > > > > > > prove >> > > > > >> >> > > > > > > > > > it? >> > > > > >> >> > > > > > > > > > > > > Because in our internal per-cluster, >> > > > > >> >> > > > > > > > > > > > > the nodes reaches > 1000 and KDC >> looks >> > > > good. >> > > > > >> Do i >> > > > > >> >> > > missed >> > > > > >> >> > > > or >> > > > > >> >> > > > > > > > > > misunderstood >> > > > > >> >> > > > > > > > > > > > > something? Please correct me. >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Best >> > > > > >> >> > > > > > > > > > > > > JunFan. >> > > > > >> >> > > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800, >> > > > > >> >> dev@flink.apache.org >> > > > > >> >> > , >> > > > > >> >> > > > > wrote: >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> > >> > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> >