> but it can happen that the JobMaster+TM collaborate to run stuff without the TM being registered at the RM
Honestly I'm not educated enough within Flink to give an example to such scenario. Until now I thought JM defines tasks to be done and TM just blindly connects to external systems and does the processing. All in all if external systems can be touched when JM + TM collaboration happens then we need to consider that in the design. Since I don't have an example scenario I don't know what exactly needs to be solved. I think we need an example case to decide whether we face a real issue or the design is not leaking. On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler <ches...@apache.org> wrote: > > Just to learn something new. I think local recovery is clear to me which > is not touching external systems like Kafka or so (correct me if I'm > wrong). Is it possible that such case the user code just starts to run > blindly w/o JM coordination and connects to external systems to do data > processing? > > Local recovery itself shouldn't touch external systems; the TM cannot just > run user-code without the JobMaster being involved, but it can happen that > the JobMaster+TM collaborate to run stuff without the TM being registered > at the RM. > > On 03/02/2022 13:48, Gabor Somogyi wrote: > > > Any error in loading the provider (be it by accident or explicit checks) > then is a setup error and we can fail the cluster. > > Fail fast is a good direction in my view. In Spark I wanted to go to this > direction but there were other opinions so there if a provider is not > loaded then the workload goes further. > Of course the processing will fail if the token is missing... > > > Requiring HBase (and Hadoop for that matter) to be on the JM system > classpath would be a bit unfortunate. Have you considered loading the > providers as plugins? > > Even if it's unfortunate the actual implementation is depending on that > already. Moving HBase and/or all token providers into plugins is a > possibility. > That way if one wants to use a specific provider then a plugin need to be > added. If we would like to go to this direction I would do that in a > separate > FLIP not to have feature creep here. The actual FLIP already covers > several thousand lines of code changes. > > > This is missing from the FLIP. From my experience with the metric > reporters, having the implementation rely on the configuration is really > annoying for testing purposes. That's why I suggested factories; they can > take care of extracting all parameters that the implementation needs, and > then pass them nicely via the constructor. > > ServiceLoader provided services must have a norarg constructor where no > parameters can be passed. > As a side note testing delegation token providers is pain in the ass and > not possible with automated tests without creating a fully featured > kerberos cluster with KDC, HDFS, HBase, Kafka, etc.. > We've had several tries in Spark but then gave it up because of the > complexity and the flakyness of it so I wouldn't care much about unit > testing. > The sad truth is that most of the token providers can be tested manually > on cluster. > > Of course this doesn't mean that the whole code is not intended to be > covered with tests. I mean couple of parts can be automatically tested but > providers are not such. > > > This also implies that any fields of the provider wouldn't inherently > have to be mutable. > > I think this is not an issue. A provider connects to a service, obtains > token(s) and then close the connection and never seen the need of an > intermediate state. > I've just mentioned the singleton behavior to be clear. > > > One examples is a TM restart + local recovery, where the TM eagerly > offers the previous set of slots to the leading JM. > > Just to learn something new. I think local recovery is clear to me which > is not touching external systems like Kafka or so (correct me if I'm wrong). > Is it possible that such case the user code just starts to run blindly w/o > JM coordination and connects to external systems to do data processing? > > > On Thu, Feb 3, 2022 at 1:09 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> 1) >> The manager certainly shouldn't check for specific implementations. >> The problem with classpath-based checks is it can easily happen that the >> provider can't be loaded in the first place (e.g., if you don't use >> reflection, which you currently kinda force), and in that case Flink can't >> tell whether the token is not required or the cluster isn't set up >> correctly. >> As I see it we shouldn't try to be clever; if the users wants kerberos, >> then have him enable the providers. Any error in loading the provider (be >> it by accident or explicit checks) then is a setup error and we can fail >> the cluster. >> If we still want to auto-detect whether the provider should be used, note >> that using factories would make this easier; the factory can check the >> classpath (not having any direct dependencies on HBase avoids the case >> above), and the provider no longer needs reflection because it will only be >> used iff HBase is on the CP. >> >> Requiring HBase (and Hadoop for that matter) to be on the JM system >> classpath would be a bit unfortunate. Have you considered loading the >> providers as plugins? >> >> 2) > DelegationTokenProvider#init method >> >> This is missing from the FLIP. From my experience with the metric >> reporters, having the implementation rely on the configuration is really >> annoying for testing purposes. That's why I suggested factories; they can >> take care of extracting all parameters that the implementation needs, and >> then pass them nicely via the constructor. This also implies that any >> fields of the provider wouldn't inherently have to be mutable. >> >> > workloads are not yet running until the initial token set is not >> propagated. >> >> This isn't necessarily true. It can happen that tasks are being deployed >> to the TM without it having registered with the RM; there is currently no >> requirement that a TM must be registered before it may offer slots / accept >> task submissions. >> One examples is a TM restart + local recovery, where the TM eagerly >> offers the previous set of slots to the leading JM. >> >> On 03/02/2022 12:39, Gabor Somogyi wrote: >> >> Thanks for the quick response! >> Appreciate your invested time... >> >> G >> >> On Thu, Feb 3, 2022 at 11:12 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> Thanks for answering the questions! >>> >>> 1) Does the HBase provider require HBase to be on the classpath? >>> >> >> To be instantiated no, to obtain a token yes. >> >> >>> If so, then could it even be loaded if Hbase is on the classpath? >>> >> >> The provider can be loaded but inside the provider it would detect >> whether HBase is on classpath. >> Just to be crystal clear here this is the actual implementation what I >> would like to take over into the Provider. >> Please see: >> https://github.com/apache/flink/blob/e6210d40491ff28c779b8604e425f01983f8a3d7/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L243-L254 >> >> I've considered to load only the necessary Providers but that would mean >> a generic Manager need to know that if the newly loaded Provider is >> instanceof HBaseDelegationTokenProvider, then it need to be skipped. >> I think it would add unnecessary complexity to the Manager and it would >> contain ugly code parts(at least in my view ugly), like this >> if (provider instanceof HBaseDelegationTokenProvider && >> hbaseIsNotOnClasspath()) { >> // Skip intentionally >> } else if (provider instanceof SomethingElseDelegationTokenProvider && >> somethingElseIsNotOnClasspath()) { >> // Skip intentionally >> } else { >> providers.put(provider.serviceName(), provider); >> } >> I think the least code and most clear approach is to load the providers >> and decide inside whether everything is given to obtain a token. >> >> If not, then you're assuming the classpath of the JM/TM to be the >>> same, which isn't necessarily true (in general; and also if Hbase is loaded >>> from the user-jar). >>> >> >> I'm not assuming that the classpath of JM/TM must be the same. If the >> HBase jar is coming from the user-jar then the HBase code is going to use >> UGI within the JVM when authentication required. >> Of course I've not yet tested within Flink but in Spark it is working >> fine. >> All in all JM/TM classpath may be different but on both side HBase jar >> must exists somehow. >> >> >>> 2) None of the *Providers* in your PoC get access to the configuration. >>> Only the *Manager* is. Note that I do not know whether there is a need >>> for the providers to have access to the config, as that's very >>> implementation specific I suppose. >>> >> >> You're right. Since this is just a POC and I don't have green light I've >> not put too many effort for a proper >> self-review. DelegationTokenProvider#init method must get Flink >> configuration. >> The reason behind is that several further configuration can be find out >> using that. A good example is to get Hadoop conf. >> The rationale behind is the same just like before, it would be good to >> create a generic Manager as possible. >> To be more specific some code must load Hadoop conf which could be the >> Manager or the Provider. >> If the manager does that then the generic Manager must be modified all >> the time when something special thing is needed for a new provider. >> This could be super problematic when a custom provider is written. >> >> >>> 10) I'm not sure myself. It could be something as trivial as creating >>> some temporary directory in HDFS I suppose. >>> >> >> I've not found of such task.YARN and K8S are not expecting such things >> from executors and workloads are not yet running until the initial token >> set is not propagated. >> >> >>> >>> On 03/02/2022 10:23, Gabor Somogyi wrote: >>> >>> Please see my answers inline. Hope provided satisfying answers to all >>> questions. >>> >>> G >>> >>> On Thu, Feb 3, 2022 at 9:17 AM Chesnay Schepler <ches...@apache.org> >>> <ches...@apache.org> wrote: >>> >>> >>> I have a few question that I'd appreciate if you could answer them. >>> >>> 1. How does the Provider know whether it is required or not? >>> >>> All registered providers which are registered properly are going to be >>> >>> loaded and asked to obtain tokens. Worth to mention every provider >>> has the right to decide whether it wants to obtain tokens or not (bool >>> delegationTokensRequired()). For instance if provider detects that >>> HBase is not on classpath or not configured properly then no tokens are >>> obtained from that specific provider. >>> >>> You may ask how a provider is registered. Here it is: >>> The provider is on classpath + there is a META-INF file which contains the >>> name of the provider, for example: >>> META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider<https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-b65ee7e64c5d2dfbb683d3569fc3e42f4b5a8052ab83d7ac21de5ab72f428e0b> >>> >>> <https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-b65ee7e64c5d2dfbb683d3569fc3e42f4b5a8052ab83d7ac21de5ab72f428e0b> >>> >>> 1. How does the configuration of Providers work (how do they get >>> access to a configuration)? >>> >>> Flink configuration is going to be passed to all providers. Please see the >>> >>> POC >>> here:https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1 >>> Service specific configurations are loaded on-the-fly. For example in HBase >>> case it looks for HBase configuration class which will be instantiated >>> within the provider. >>> >>> >>> 1. How does a user select providers? (Is it purely based on the >>> provider being on the classpath?) >>> >>> Providers can be explicitly turned off with the following config: >>> >>> "security.kerberos.tokens.${name}.enabled". I've never seen that 2 >>> different implementation would exist for a specific >>> external service, but if this edge case would exist then the mentioned >>> config need to be added, a new provider with a different name need to be >>> implemented and registered. >>> All in all we've seen that provider handling is not user specific task but >>> a cluster admin one. If a specific provider is needed then it's implemented >>> once per company, registered once >>> to the clusters and then all users may or may not use the obtained tokens. >>> >>> Worth to mention the system will know which token need to be used when HDFS >>> is accessed, this part is automatic. >>> >>> >>> 1. How can a user override an existing provider? >>> >>> Pease see the previous bulletpoint. >>> >>> 1. What is DelegationTokenProvider#name() used for? >>> >>> By default all providers which are registered properly (on classpath + >>> >>> META-INF entry) are on by default. With >>> "security.kerberos.tokens.${name}.enabled" a specific provider can be >>> turned off. >>> Additionally I'm intended to use this in log entries later on for debugging >>> purposes. For example "hadoopfs provider obtained 2 tokens with ID...". >>> This would help what and when is happening >>> with tokens. The same applies to TaskManager side: "2 hadoopfs provider >>> tokens arrived with ID...". Important to note that the secret part will be >>> hidden in the mentioned log entries to keep the >>> attach surface low. >>> >>> >>> 1. What happens if the names of 2 providers are identical? >>> >>> Presume you mean 2 different classes which both registered and having the >>> >>> same logic inside. This case both will be loaded and both is going to >>> obtain token(s) for the same service. >>> Both obtained token(s) are going to be added to the UGI. As a result the >>> second will overwrite the first but the order is not defined. Since both >>> token(s) are valid no matter which one is >>> used then access to the external system will work. >>> >>> When the class names are same then service loader only loads a single entry >>> because services are singletons. That's the reason why state inside >>> providers are not advised. >>> >>> >>> 1. Will we directly load the provider, or first load a factory >>> (usually preferable)? >>> >>> Intended to load a provider directly by DTM. We can add an extra layer to >>> >>> have factory but after consideration I came to a conclusion that it would >>> be and overkill this case. >>> Please have a look how it's planned to load providers >>> now:https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-d56a0bc77335ff23c0318f6dec1872e7b19b1a9ef6d10fff8fbaab9aecac94faR54-R81 >>> >>> 1. What is the Credentials class (it would necessarily have to be a >>> public api as well)? >>> >>> Credentials class is coming from Hadoop. My main intention was not to bind >>> >>> the implementation to Hadoop completely. It is not possible because of the >>> following reasons: >>> * Several functionalities are must because there are no alternatives, >>> including but not limited to login from keytab, proper TGT cache handling, >>> passing tokens to Hadoop services like HDFS, HBase, Hive, etc. >>> * The partial win is that the whole delegation token framework is going to >>> be initiated if hadoop-common is on classpath (Hadoop is optional in core >>> libraries) >>> The possibility to eliminate Credentials from API could be: >>> * to convert Credentials to byte array forth and back while a provider >>> gives back token(s): I think this would be an overkill and would make the >>> API less clear what to give back what Manager understands >>> * to re-implement Credentials internal structure in a POJO, here the same >>> convert forth and back would happen between provider and manager. I think >>> this case would be the re-invent the wheel scenario >>> >>> >>> 1. What does the TaskManager do with the received token? >>> >>> Puts the tokens into the UserGroupInformation instance for the current >>> >>> user. Such way Hadoop compatible services can pick up the tokens from there >>> properly. >>> This is an existing pattern inside Spark. >>> >>> >>> 1. Is there any functionality in the TaskManager that could require a >>> token on startup (i.e., before registering with the RM)? >>> >>> Never seen such functionality in Spark and after analysis not seen in >>> >>> Flink too. If you have something in mind which I've missed plz help me out. >>> >>> >>> >>> On 11/01/2022 14:58, Gabor Somogyi wrote: >>> >>> Hi All, >>> >>> Hope all of you have enjoyed the holiday season. >>> >>> I would like to start the discussion on >>> FLIP-211<https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework> >>> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework> >>> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework> >>> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework> >>> which >>> aims to provide a >>> Kerberos delegation token framework that /obtains/renews/distributes tokens >>> out-of-the-box. >>> >>> Please be aware that the FLIP wiki area is not fully done since the >>> discussion may >>> change the feature in major ways. The proposal can be found in a google doc >>> here<https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ> >>> >>> <https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ> >>> >>> <https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ> >>> >>> <https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ> >>> . >>> As the community agrees on the approach the content will be moved to the >>> wiki page. >>> >>> Feel free to add your thoughts to make this feature better! >>> >>> BR, >>> G >>> >>> >>> >>> >>> >>> >>> >> >