> > Do we need to introduce a new RPC method or can we for example piggyback > on heartbeats? >
Seems we can use the very same approach as _ResourceManagerPartitionTracker_ is using: - _TaskManagers_ periodically report which token they're using (eg. identified by some id). This involves adding a new field into _TaskExecutorHeartbeatPayload_. - Once report arrives, DTM checks the token and updates it if necessary (we'd introduce a new method for that on TaskExecutorGateway). - If update fails, we don't need to retry. The next heartbeat takes care of that. - Heartbeat mechanism already covers TM failure scenarios On Mon, Jan 24, 2022 at 3:03 PM David Morávek <d...@apache.org> wrote: > Could you point to a code where you think it could be added exactly? A >> helping hand is welcome here 🙂 >> > > I think you can take a look at _ResourceManagerPartitionTracker_ [1] which > seems to have somewhat similar properties to the DTM. > > One topic that needs to be addressed there is how the RPC with the > _TaskExecutorGateway_ should look like. > - Do we need to introduce a new RPC method or can we for example piggyback > on heartbeats? > - What delivery semantics are we looking for? (what if we're only able to > update subset of TMs / what happens if we exhaust retries / should we even > have the retry mechanism whatsoever) - I have a feeling that somehow > leveraging the existing heartbeat mechanism could help to answer these > questions > > In short, after DT reaches it's max lifetime then log aggregation stops >> > > What does it mean for the running application (how does this look like > from the user perspective)? As far as I remember the logs are only > collected ("aggregated") after the container is stopped, is that correct? I > think this topic should get its own section in the FLIP (having some cross > reference to YARN ticket would be really useful, but I'm not sure if there > are any). > > All deployment modes (per-job, per-app, ...) are planned to be tested and >> expect to work with the initial implementation however not all deployment >> targets (k8s, local, ... >> > > If we split the FLIP into two parts / sections that I've suggested, I > don't really think that you need to explicitly test for each deployment > scenario / cluster framework, because the DTM part is completely > independent of the deployment target. Basically this is what I'm aiming for > with "making it work with the standalone" (as simple as starting a new java > process) Flink first (which is also how most people deploy streaming > application on k8s and the direction we're pushing forward with the > auto-scaling / reactive mode initiatives). > > The whole integration with YARN (let's forget about log aggregation for a > moment) / k8s-native only boils down to how do we make the keytab file > local to the JobManager so the DTM can read it, so it's basically built on > top of that. The only special thing that needs to be tested there is the > "keytab distribution" code path. > > [1] > https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java > > Best, > D. > > On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> > There is a separate JobMaster for each job >> within a Flink cluster and each JobMaster only has a partial view of the >> task managers >> >> Good point! I've had a deeper look and you're right. We definitely need to >> find another place. >> >> > Related per-cluster or per-job keytab: >> >> In the current code per-cluster keytab is implemented and I'm intended to >> keep it like this within this FLIP. The reason is simple: tokens on TM >> side >> can be stored within the UserGroupInformation (UGI) structure which is >> global. I'm not telling it's impossible to change that but I think that >> this is such a complexity which the initial implementation is not required >> to contain. Additionally we've not seen such need from user side. If the >> need may rise later on then another FLIP with this topic can be created >> and >> discussed. Proper multi-UGI handling within a single JVM is a topic where >> several round of deep-dive with the Hadoop/YARN guys are required. >> >> > single DTM instance embedded with >> the ResourceManager (the Flink component) >> >> Could you point to a code where you think it could be added exactly? A >> helping hand is welcome here🙂 >> >> > Then the single (initial) implementation should work with all the >> deployments modes out of the box (which is not what the FLIP suggests). Is >> that correct? >> >> All deployment modes (per-job, per-app, ...) are planned to be tested and >> expect to work with the initial implementation however not all deployment >> targets (k8s, local, ...) are not intended to be tested. Per deployment >> target new jira needs to be created where I expect small number of codes >> needs to be added and relatively expensive testing effort is required. >> >> > I've taken a look into the prototype and in the "YarnClusterDescriptor" >> you're injecting a delegation token into the AM [1] (that's obtained using >> the provided keytab). If I understand this correctly from previous >> discussion / FLIP, this is to support log aggregation and DT has a limited >> validity. How is this DT going to be renewed? >> >> You're clever and touched a limitation which Spark has too. In short, >> after >> DT reaches it's max lifetime then log aggregation stops. I've had several >> deep-dive rounds with the YARN guys at Spark years because wanted to fill >> this gap. They can't provide us any way to re-inject the newly obtained DT >> so at the end I gave up this. >> >> BR, >> G >> >> >> On Mon, 24 Jan 2022, 11:00 David Morávek, <d...@apache.org> wrote: >> >> > Hi Gabor, >> > >> > There is actually a huge difference between JobManager (process) and >> > JobMaster (job coordinator). The naming is unfortunately bit misleading >> > here from historical reasons. There is a separate JobMaster for each job >> > within a Flink cluster and each JobMaster only has a partial view of the >> > task managers (depends on where the slots for a particular job are >> > allocated). This means that you'll end up with N >> "DelegationTokenManagers" >> > competing with each other (N = number of running jobs in the cluster). >> > >> > This makes me think we're mixing two abstraction levels here: >> > >> > a) Per-cluster delegation tokens >> > - Simpler approach, it would involve a single DTM instance embedded with >> > the ResourceManager (the Flink component) >> > b) Per-job delegation tokens >> > - More complex approach, but could be more flexible from the user side >> of >> > things. >> > - Multiple DTM instances, that are bound with the JobMaster lifecycle. >> > Delegation tokens are attached with a particular slots that are >> executing >> > the job tasks instead of the whole task manager (TM could be executing >> > multiple jobs with different tokens). >> > - The question is which keytab should be used for the clustering >> framework, >> > to support log aggregation on YARN (an extra keytab, keytab that comes >> with >> > the first job?) >> > >> > I think these are the things that need to be clarified in the FLIP >> before >> > proceeding. >> > >> > A follow-up question for getting a better understanding where this >> should >> > be headed: Are there any use cases where user may want to use different >> > keytabs with each job, or are we fine with using a cluster-wide keytab? >> If >> > we go with per-cluster keytabs, is it OK that all jobs submitted into >> this >> > cluster can access it (even the future ones)? Should this be a security >> > concern? >> > >> > Presume you though I would implement a new class with JobManager name. >> The >> > > plan is not that. >> > > >> > >> > I've never suggested such thing. >> > >> > >> > > No. That said earlier DT handling is planned to be done completely in >> > > Flink. DTM has a renewal thread which re-obtains tokens in the proper >> > time >> > > when needed. >> > > >> > >> > Then the single (initial) implementation should work with all the >> > deployments modes out of the box (which is not what the FLIP suggests). >> Is >> > that correct? >> > >> > If the cluster framework, also requires delegation token for their inner >> > working (this is IMO only applies to YARN), it might need an extra step >> > (injecting the token into application master container). >> > >> > Separating the individual layers (actual Flink cluster - basically >> making >> > this work with a standalone deployment / "cluster framework" - support >> for >> > YARN log aggregation) in the FLIP would be useful. >> > >> > Reading the linked Spark readme could be useful. >> > > >> > >> > I've read that, but please be patient with the questions, Kerberos is >> not >> > an easy topic to get into and I've had a very little contact with it in >> the >> > past. >> > >> > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > >> > >> > I've taken a look into the prototype and in the "YarnClusterDescriptor" >> > you're injecting a delegation token into the AM [1] (that's obtained >> using >> > the provided keytab). If I understand this correctly from previous >> > discussion / FLIP, this is to support log aggregation and DT has a >> limited >> > validity. How is this DT going to be renewed? >> > >> > [1] >> > >> > >> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261 >> > >> > Best, >> > D. >> > >> > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi < >> gabor.g.somo...@gmail.com> >> > wrote: >> > >> > > Here is the exact class, I'm from mobile so not had a look at the >> exact >> > > class name: >> > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > That keeps track of TMs where the tokens can be sent to. >> > > >> > > > My feeling would be that we shouldn't really introduce a new >> component >> > > with >> > > a custom lifecycle, but rather we should try to incorporate this into >> > > existing ones. >> > > >> > > Can you be more specific? Presume you though I would implement a new >> > class >> > > with JobManager name. The plan is not that. >> > > >> > > > If I understand this correctly, this means that we then push the >> token >> > > renewal logic to YARN. >> > > >> > > No. That said earlier DT handling is planned to be done completely in >> > > Flink. DTM has a renewal thread which re-obtains tokens in the proper >> > time >> > > when needed. YARN log aggregation is a totally different feature, >> where >> > > YARN does the renewal. Log aggregation was an example why the code >> can't >> > be >> > > 100% reusable for all resource managers. Reading the linked Spark >> readme >> > > could be useful. >> > > >> > > G >> > > >> > > On Fri, 21 Jan 2022, 21:05 David Morávek, <d...@apache.org> wrote: >> > > >> > > > > >> > > > > JobManager is the Flink class. >> > > > >> > > > >> > > > There is no such class in Flink. The closest thing to the JobManager >> > is a >> > > > ClusterEntrypoint. The cluster entrypoint spawns new RM Runner & >> > > Dispatcher >> > > > Runner that start participating in the leader election. Once they >> gain >> > > > leadership they spawn the actual underlying instances of these two >> > "main >> > > > components". >> > > > >> > > > My feeling would be that we shouldn't really introduce a new >> component >> > > with >> > > > a custom lifecycle, but rather we should try to incorporate this >> into >> > > > existing ones. >> > > > >> > > > My biggest concerns would be: >> > > > >> > > > - How would the lifecycle of the new component look like with >> regards >> > to >> > > HA >> > > > setups. If we really try to decide to introduce a completely new >> > > component, >> > > > how should this work in case of multiple JobManager instances? >> > > > - Which components does it talk to / how? For example how does the >> > > > broadcast of new token to task managers (TaskManagerGateway) look >> like? >> > > Do >> > > > we simply introduce a new RPC on the ResourceManagerGateway that >> > > broadcasts >> > > > it or does the new component need to do some kind of bookkeeping of >> > task >> > > > managers that it needs to notify? >> > > > >> > > > YARN based HDFS log aggregation would not work by dropping that >> code. >> > > Just >> > > > > to be crystal clear, the actual implementation contains this fir >> > > exactly >> > > > > this reason. >> > > > > >> > > > >> > > > This is the missing part +1. If I understand this correctly, this >> means >> > > > that we then push the token renewal logic to YARN. How do you plan >> to >> > > > implement the renewal logic on k8s? >> > > > >> > > > D. >> > > > >> > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi < >> > gabor.g.somo...@gmail.com >> > > > >> > > > wrote: >> > > > >> > > > > > I think we might both mean something different by the RM. >> > > > > >> > > > > You feel it well, I've not specified these terms well in the >> > > explanation. >> > > > > RM I meant resource management framework. JobManager is the Flink >> > > class. >> > > > > This means that inside JM instance there will be a DTM instance, >> so >> > > they >> > > > > would have the same lifecycle. Hope I've answered the question. >> > > > > >> > > > > > If we have tokens available on the client side, why do we need >> to >> > set >> > > > > them >> > > > > into the AM (yarn specific concept) launch context? >> > > > > >> > > > > YARN based HDFS log aggregation would not work by dropping that >> code. >> > > > Just >> > > > > to be crystal clear, the actual implementation contains this fir >> > > exactly >> > > > > this reason. >> > > > > >> > > > > G >> > > > > >> > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, <d...@apache.org> >> wrote: >> > > > > >> > > > > > Hi Gabor, >> > > > > > >> > > > > > 1. One thing is important, token management is planned to be >> done >> > > > > > > generically within Flink and not scattered in RM specific >> code. >> > > > > > JobManager >> > > > > > > has a DelegationTokenManager which obtains tokens time-to-time >> > (if >> > > > > > > configured properly). JM knows which TaskManagers are in >> place so >> > > it >> > > > > can >> > > > > > > distribute it to all TMs. That's it basically. >> > > > > > >> > > > > > >> > > > > > I think we might both mean something different by the RM. >> > JobManager >> > > is >> > > > > > basically just a process encapsulating multiple components, one >> of >> > > > which >> > > > > is >> > > > > > a ResourceManager, which is the component that manages task >> manager >> > > > > > registrations [1]. There is more or less a single >> implementation of >> > > the >> > > > > RM >> > > > > > with plugable drivers for the active integrations (yarn, k8s). >> > > > > > >> > > > > > It would be great if you could share more details of how exactly >> > the >> > > > DTM >> > > > > is >> > > > > > going to fit in the current JM architecture. >> > > > > > >> > > > > > 2. 99.9% of the code is generic but each RM handles tokens >> > > > differently. A >> > > > > > > good example is YARN obtains tokens on client side and then >> sets >> > > them >> > > > > on >> > > > > > > the newly created AM container launch context. This is purely >> > YARN >> > > > > > specific >> > > > > > > and cant't be spared. With my actual plans standalone can be >> > > changed >> > > > to >> > > > > > use >> > > > > > > the framework. By using it I mean no RM specific DTM or >> > whatsoever >> > > is >> > > > > > > needed. >> > > > > > > >> > > > > > >> > > > > > If we have tokens available on the client side, why do we need >> to >> > set >> > > > > them >> > > > > > into the AM (yarn specific concept) launch context? Why can't we >> > > simply >> > > > > > send them to the JM, eg. as a parameter of the job submission / >> via >> > > > > > separate RPC call? There might be something I'm missing due to >> > > limited >> > > > > > knowledge, but handling the token on the "cluster framework" >> level >> > > > > doesn't >> > > > > > seem necessary. >> > > > > > >> > > > > > [1] >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager >> > > > > > >> > > > > > Best, >> > > > > > D. >> > > > > > >> > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor Somogyi < >> > > > gabor.g.somo...@gmail.com >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Oh and one more thing. I'm planning to add this feature in >> small >> > > > chunk >> > > > > of >> > > > > > > PRs because security is super hairy area. That way reviewers >> can >> > be >> > > > > more >> > > > > > > easily obtains the concept. >> > > > > > > >> > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, <d...@apache.org> >> > wrote: >> > > > > > > >> > > > > > > > Hi Gabor, >> > > > > > > > >> > > > > > > > thanks for drafting the FLIP, I think having a solid >> Kerberos >> > > > support >> > > > > > is >> > > > > > > > crucial for many enterprise deployments. >> > > > > > > > >> > > > > > > > I have multiple questions regarding the implementation (note >> > > that I >> > > > > > have >> > > > > > > > very limited knowledge of Kerberos): >> > > > > > > > >> > > > > > > > 1) If I understand it correctly, we'll only obtain tokens in >> > the >> > > > job >> > > > > > > > manager and then we'll distribute them via RPC (needs to be >> > > > secured). >> > > > > > > > >> > > > > > > > Can you please outline how the communication will look >> like? Is >> > > the >> > > > > > > > DelegationTokenManager going to be a part of the >> > ResourceManager? >> > > > Can >> > > > > > you >> > > > > > > > outline it's lifecycle / how it's going to be integrated >> there? >> > > > > > > > >> > > > > > > > 2) Do we really need a YARN / k8s specific implementations? >> Is >> > it >> > > > > > > possible >> > > > > > > > to obtain / renew a token in a generic way? Maybe to >> rephrase >> > > that, >> > > > > is >> > > > > > it >> > > > > > > > possible to implement DelegationTokenManager for the >> standalone >> > > > > Flink? >> > > > > > If >> > > > > > > > we're able to solve this point, it could be possible to >> target >> > > all >> > > > > > > > deployment scenarios with a single implementation. >> > > > > > > > >> > > > > > > > Best, >> > > > > > > > D. >> > > > > > > > >> > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan Zhang < >> > > > > zuston.sha...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi G >> > > > > > > > > >> > > > > > > > > Thanks for your explain in detail. I have gotten your >> > thoughts, >> > > > and >> > > > > > any >> > > > > > > > > way this proposal >> > > > > > > > > is a great improvement. >> > > > > > > > > >> > > > > > > > > Looking forward to your implementation and i will keep >> focus >> > on >> > > > it. >> > > > > > > > > Thanks again. >> > > > > > > > > >> > > > > > > > > Best >> > > > > > > > > JunFan. >> > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor Somogyi < >> > > > > > > gabor.g.somo...@gmail.com >> > > > > > > > >, >> > > > > > > > > wrote: >> > > > > > > > > > Just to confirm keeping >> > > > > "security.kerberos.fetch.delegation-token" >> > > > > > is >> > > > > > > > > added >> > > > > > > > > > to the doc. >> > > > > > > > > > >> > > > > > > > > > BR, >> > > > > > > > > > G >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor Somogyi < >> > > > > > > > gabor.g.somo...@gmail.com >> > > > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi JunFan, >> > > > > > > > > > > >> > > > > > > > > > > > By the way, maybe this should be added in the >> migration >> > > > plan >> > > > > or >> > > > > > > > > > > intergation section in the FLIP-211. >> > > > > > > > > > > >> > > > > > > > > > > Going to add this soon. >> > > > > > > > > > > >> > > > > > > > > > > > Besides, I have a question that the KDC will >> collapse >> > > when >> > > > > the >> > > > > > > > > cluster >> > > > > > > > > > > reached 200 nodes you described >> > > > > > > > > > > in the google doc. Do you have any attachment or >> > reference >> > > to >> > > > > > prove >> > > > > > > > it? >> > > > > > > > > > > >> > > > > > > > > > > "KDC *may* collapse under some circumstances" is the >> > proper >> > > > > > > wording. >> > > > > > > > > > > >> > > > > > > > > > > We have several customers who are executing workloads >> on >> > > > > > > Spark/Flink. >> > > > > > > > > Most >> > > > > > > > > > > of the time I'm facing their >> > > > > > > > > > > daily issues which is heavily environment and use-case >> > > > > dependent. >> > > > > > > > I've >> > > > > > > > > > > seen various cases: >> > > > > > > > > > > * where the mentioned ~1k nodes were working fine >> > > > > > > > > > > * where KDC thought the number of requests are coming >> > from >> > > > DDOS >> > > > > > > > attack >> > > > > > > > > so >> > > > > > > > > > > discontinued authentication >> > > > > > > > > > > * where KDC was simply not responding because of the >> load >> > > > > > > > > > > * where KDC was intermittently had some outage (this >> was >> > > the >> > > > > most >> > > > > > > > nasty >> > > > > > > > > > > thing) >> > > > > > > > > > > >> > > > > > > > > > > Since you're managing relatively big cluster then you >> > know >> > > > that >> > > > > > KDC >> > > > > > > > is >> > > > > > > > > not >> > > > > > > > > > > only used by Spark/Flink workloads >> > > > > > > > > > > but the whole company IT infrastructure is bombing it >> so >> > it >> > > > > > really >> > > > > > > > > depends >> > > > > > > > > > > on other factors too whether KDC is reaching >> > > > > > > > > > > it's limit or not. Not sure what kind of evidence are >> you >> > > > > looking >> > > > > > > for >> > > > > > > > > but >> > > > > > > > > > > I'm not authorized to share any information about >> > > > > > > > > > > our clients data. >> > > > > > > > > > > >> > > > > > > > > > > One thing is for sure. The more external system types >> are >> > > > used >> > > > > in >> > > > > > > > > > > workloads (for ex. HDFS, HBase, Hive, Kafka) which >> > > > > > > > > > > are authenticating through KDC the more possibility to >> > > reach >> > > > > this >> > > > > > > > > > > threshold when the cluster is big enough. >> > > > > > > > > > > >> > > > > > > > > > > All in all this feature is here to help all users >> never >> > > reach >> > > > > > this >> > > > > > > > > > > limitation. >> > > > > > > > > > > >> > > > > > > > > > > BR, >> > > > > > > > > > > G >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆 < >> > > zuston.sha...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi G >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for your quick reply. I think reserving the >> > config >> > > > of >> > > > > > > > > > > > *security.kerberos.fetch.delegation-token* >> > > > > > > > > > > > and simplifying disable the token fetching is a good >> > > > idea.By >> > > > > > the >> > > > > > > > way, >> > > > > > > > > > > > maybe this should be added >> > > > > > > > > > > > in the migration plan or intergation section in the >> > > > FLIP-211. >> > > > > > > > > > > > >> > > > > > > > > > > > Besides, I have a question that the KDC will >> collapse >> > > when >> > > > > the >> > > > > > > > > cluster >> > > > > > > > > > > > reached 200 nodes you described >> > > > > > > > > > > > in the google doc. Do you have any attachment or >> > > reference >> > > > to >> > > > > > > prove >> > > > > > > > > it? >> > > > > > > > > > > > Because in our internal per-cluster, >> > > > > > > > > > > > the nodes reaches > 1000 and KDC looks good. Do i >> > missed >> > > or >> > > > > > > > > misunderstood >> > > > > > > > > > > > something? Please correct me. >> > > > > > > > > > > > >> > > > > > > > > > > > Best >> > > > > > > > > > > > JunFan. >> > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800, >> dev@flink.apache.org, >> > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >