>  but it can happen that the JobMaster+TM collaborate to run stuff without
the TM being registered at the RM

Honestly I'm not educated enough within Flink to give an example to such
scenario.
Until now I thought JM defines tasks to be done and TM just blindly
connects to external systems and does the processing.
All in all if external systems can be touched when JM + TM collaboration
happens then we need to consider that in the design.
Since I don't have an example scenario I don't know what exactly needs to
be solved.
I think we need an example case to decide whether we face a real issue or
the design is not leaking.


On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler <ches...@apache.org> wrote:

> > Just to learn something new. I think local recovery is clear to me which
> is not touching external systems like Kafka or so (correct me if I'm
> wrong). Is it possible that such case the user code just starts to run
> blindly w/o JM coordination and connects to external systems to do data
> processing?
>
> Local recovery itself shouldn't touch external systems; the TM cannot just
> run user-code without the JobMaster being involved, but it can happen that
> the JobMaster+TM collaborate to run stuff without the TM being registered
> at the RM.
>
> On 03/02/2022 13:48, Gabor Somogyi wrote:
>
> > Any error in loading the provider (be it by accident or explicit checks)
> then is a setup error and we can fail the cluster.
>
> Fail fast is a good direction in my view. In Spark I wanted to go to this
> direction but there were other opinions so there if a provider is not
> loaded then the workload goes further.
> Of course the processing will fail if the token is missing...
>
> > Requiring HBase (and Hadoop for that matter) to be on the JM system
> classpath would be a bit unfortunate. Have you considered loading the
> providers as plugins?
>
> Even if it's unfortunate the actual implementation is depending on that
> already. Moving HBase and/or all token providers into plugins is a
> possibility.
> That way if one wants to use a specific provider then a plugin need to be
> added. If we would like to go to this direction I would do that in a
> separate
> FLIP not to have feature creep here. The actual FLIP already covers
> several thousand lines of code changes.
>
> > This is missing from the FLIP. From my experience with the metric
> reporters, having the implementation rely on the configuration is really
> annoying for testing purposes. That's why I suggested factories; they can
> take care of extracting all parameters that the implementation needs, and
> then pass them nicely via the constructor.
>
> ServiceLoader provided services must have a norarg constructor where no
> parameters can be passed.
> As a side note testing delegation token providers is pain in the ass and
> not possible with automated tests without creating a fully featured
> kerberos cluster with KDC, HDFS, HBase, Kafka, etc..
> We've had several tries in Spark but then gave it up because of the
> complexity and the flakyness of it so I wouldn't care much about unit
> testing.
> The sad truth is that most of the token providers can be tested manually
> on cluster.
>
> Of course this doesn't mean that the whole code is not intended to be
> covered with tests. I mean couple of parts can be automatically tested but
> providers are not such.
>
> > This also implies that any fields of the provider wouldn't inherently
> have to be mutable.
>
> I think this is not an issue. A provider connects to a service, obtains
> token(s) and then close the connection and never seen the need of an
> intermediate state.
> I've just mentioned the singleton behavior to be clear.
>
> > One examples is a TM restart + local recovery, where the TM eagerly
> offers the previous set of slots to the leading JM.
>
> Just to learn something new. I think local recovery is clear to me which
> is not touching external systems like Kafka or so (correct me if I'm wrong).
> Is it possible that such case the user code just starts to run blindly w/o
> JM coordination and connects to external systems to do data processing?
>
>
> On Thu, Feb 3, 2022 at 1:09 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> 1)
>> The manager certainly shouldn't check for specific implementations.
>> The problem with classpath-based checks is it can easily happen that the
>> provider can't be loaded in the first place (e.g., if you don't use
>> reflection, which you currently kinda force), and in that case Flink can't
>> tell whether the token is not required or the cluster isn't set up
>> correctly.
>> As I see it we shouldn't try to be clever; if the users wants kerberos,
>> then have him enable the providers. Any error in loading the provider (be
>> it by accident or explicit checks) then is a setup error and we can fail
>> the cluster.
>> If we still want to auto-detect whether the provider should be used, note
>> that using factories would make this easier; the factory can check the
>> classpath (not having any direct dependencies on HBase avoids the case
>> above), and the provider no longer needs reflection because it will only be
>> used iff HBase is on the CP.
>>
>> Requiring HBase (and Hadoop for that matter) to be on the JM system
>> classpath would be a bit unfortunate. Have you considered loading the
>> providers as plugins?
>>
>> 2) > DelegationTokenProvider#init method
>>
>> This is missing from the FLIP. From my experience with the metric
>> reporters, having the implementation rely on the configuration is really
>> annoying for testing purposes. That's why I suggested factories; they can
>> take care of extracting all parameters that the implementation needs, and
>> then pass them nicely via the constructor. This also implies that any
>> fields of the provider wouldn't inherently have to be mutable.
>>
>> > workloads are not yet running until the initial token set is not
>> propagated.
>>
>> This isn't necessarily true. It can happen that tasks are being deployed
>> to the TM without it having registered with the RM; there is currently no
>> requirement that a TM must be registered before it may offer slots / accept
>> task submissions.
>> One examples is a TM restart + local recovery, where the TM eagerly
>> offers the previous set of slots to the leading JM.
>>
>> On 03/02/2022 12:39, Gabor Somogyi wrote:
>>
>> Thanks for the quick response!
>> Appreciate your invested time...
>>
>> G
>>
>> On Thu, Feb 3, 2022 at 11:12 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> Thanks for answering the questions!
>>>
>>> 1) Does the HBase provider require HBase to be on the classpath?
>>>
>>
>> To be instantiated no, to obtain a token yes.
>>
>>
>>>     If so, then could it even be loaded if Hbase is on the classpath?
>>>
>>
>> The provider can be loaded but inside the provider it would detect
>> whether HBase is on classpath.
>> Just to be crystal clear here this is the actual implementation what I
>> would like to take over into the Provider.
>> Please see:
>> https://github.com/apache/flink/blob/e6210d40491ff28c779b8604e425f01983f8a3d7/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L243-L254
>>
>> I've considered to load only the necessary Providers but that would mean
>> a generic Manager need to know that if the newly loaded Provider is
>> instanceof HBaseDelegationTokenProvider, then it need to be skipped.
>> I think it would add unnecessary complexity to the Manager and it would
>> contain ugly code parts(at least in my view ugly), like this
>> if (provider instanceof HBaseDelegationTokenProvider &&
>> hbaseIsNotOnClasspath()) {
>>   // Skip intentionally
>> } else if (provider instanceof SomethingElseDelegationTokenProvider &&
>> somethingElseIsNotOnClasspath()) {
>>   // Skip intentionally
>> } else {
>>   providers.put(provider.serviceName(), provider);
>> }
>> I think the least code and most clear approach is to load the providers
>> and decide inside whether everything is given to obtain a token.
>>
>>     If not, then you're assuming the classpath of the JM/TM to be the
>>> same, which isn't necessarily true (in general; and also if Hbase is loaded
>>> from the user-jar).
>>>
>>
>> I'm not assuming that the classpath of JM/TM must be the same. If the
>> HBase jar is coming from the user-jar then the HBase code is going to use
>> UGI within the JVM when authentication required.
>> Of course I've not yet tested within Flink but in Spark it is working
>> fine.
>> All in all JM/TM classpath may be different but on both side HBase jar
>> must exists somehow.
>>
>>
>>> 2) None of the *Providers* in your PoC get access to the configuration.
>>> Only the *Manager* is. Note that I do not know whether there is a need
>>> for the providers to have access to the config, as that's very
>>> implementation specific I suppose.
>>>
>>
>> You're right. Since this is just a POC and I don't have green light I've
>> not put too many effort for a proper
>> self-review. DelegationTokenProvider#init method must get Flink
>> configuration.
>> The reason behind is that several further configuration can be find out
>> using that. A good example is to get Hadoop conf.
>> The rationale behind is the same just like before, it would be good to
>> create a generic Manager as possible.
>> To be more specific some code must load Hadoop conf which could be the
>> Manager or the Provider.
>> If the manager does that then the generic Manager must be modified all
>> the time when something special thing is needed for a new provider.
>> This could be super problematic when a custom provider is written.
>>
>>
>>> 10) I'm not sure myself. It could be something as trivial as creating
>>> some temporary directory in HDFS I suppose.
>>>
>>
>> I've not found of such task.YARN and K8S are not expecting such things
>> from executors and workloads are not yet running until the initial token
>> set is not propagated.
>>
>>
>>>
>>> On 03/02/2022 10:23, Gabor Somogyi wrote:
>>>
>>> Please see my answers inline. Hope provided satisfying answers to all
>>> questions.
>>>
>>> G
>>>
>>> On Thu, Feb 3, 2022 at 9:17 AM Chesnay Schepler <ches...@apache.org> 
>>> <ches...@apache.org> wrote:
>>>
>>>
>>> I have a few question that I'd appreciate if you could answer them.
>>>
>>>    1. How does the Provider know whether it is required or not?
>>>
>>> All registered providers which are registered properly are going to be
>>>
>>> loaded and asked to obtain tokens. Worth to mention every provider
>>> has the right to decide whether it wants to obtain tokens or not (bool
>>> delegationTokensRequired()). For instance if provider detects that
>>> HBase is not on classpath or not configured properly then no tokens are
>>> obtained from that specific provider.
>>>
>>> You may ask how a provider is registered. Here it is:
>>> The provider is on classpath + there is a META-INF file which contains the
>>> name of the provider, for example:
>>> META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider<https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-b65ee7e64c5d2dfbb683d3569fc3e42f4b5a8052ab83d7ac21de5ab72f428e0b>
>>>  
>>> <https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-b65ee7e64c5d2dfbb683d3569fc3e42f4b5a8052ab83d7ac21de5ab72f428e0b>
>>>
>>>     1. How does the configuration of Providers work (how do they get
>>>    access to a configuration)?
>>>
>>> Flink configuration is going to be passed to all providers. Please see the
>>>
>>> POC 
>>> here:https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1
>>> Service specific configurations are loaded on-the-fly. For example in HBase
>>> case it looks for HBase configuration class which will be instantiated
>>> within the provider.
>>>
>>>
>>>    1. How does a user select providers? (Is it purely based on the
>>>    provider being on the classpath?)
>>>
>>> Providers can be explicitly turned off with the following config:
>>>
>>> "security.kerberos.tokens.${name}.enabled". I've never seen that 2
>>> different implementation would exist for a specific
>>> external service, but if this edge case would exist then the mentioned
>>> config need to be added, a new provider with a different name need to be
>>> implemented and registered.
>>> All in all we've seen that provider handling is not user specific task but
>>> a cluster admin one. If a specific provider is needed then it's implemented
>>> once per company, registered once
>>> to the clusters and then all users may or may not use the obtained tokens.
>>>
>>> Worth to mention the system will know which token need to be used when HDFS
>>> is accessed, this part is automatic.
>>>
>>>
>>>    1. How can a user override an existing provider?
>>>
>>> Pease see the previous bulletpoint.
>>>
>>>    1. What is DelegationTokenProvider#name() used for?
>>>
>>> By default all providers which are registered properly (on classpath +
>>>
>>> META-INF entry) are on by default. With
>>> "security.kerberos.tokens.${name}.enabled" a specific provider can be
>>> turned off.
>>> Additionally I'm intended to use this in log entries later on for debugging
>>> purposes. For example "hadoopfs provider obtained 2 tokens with ID...".
>>> This would help what and when is happening
>>> with tokens. The same applies to TaskManager side: "2 hadoopfs provider
>>> tokens arrived with ID...". Important to note that the secret part will be
>>> hidden in the mentioned log entries to keep the
>>> attach surface low.
>>>
>>>
>>>    1. What happens if the names of 2 providers are identical?
>>>
>>> Presume you mean 2 different classes which both registered and having the
>>>
>>> same logic inside. This case both will be loaded and both is going to
>>> obtain token(s) for the same service.
>>> Both obtained token(s) are going to be added to the UGI. As a result the
>>> second will overwrite the first but the order is not defined. Since both
>>> token(s) are valid no matter which one is
>>> used then access to the external system will work.
>>>
>>> When the class names are same then service loader only loads a single entry
>>> because services are singletons. That's the reason why state inside
>>> providers are not advised.
>>>
>>>
>>>    1. Will we directly load the provider, or first load a factory
>>>    (usually preferable)?
>>>
>>> Intended to load a provider directly by DTM. We can add an extra layer to
>>>
>>> have factory but after consideration I came to a conclusion that it would
>>> be and overkill this case.
>>> Please have a look how it's planned to load providers 
>>> now:https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-d56a0bc77335ff23c0318f6dec1872e7b19b1a9ef6d10fff8fbaab9aecac94faR54-R81
>>>
>>>     1. What is the Credentials class (it would necessarily have to be a
>>>    public api as well)?
>>>
>>> Credentials class is coming from Hadoop. My main intention was not to bind
>>>
>>> the implementation to Hadoop completely. It is not possible because of the
>>> following reasons:
>>> * Several functionalities are must because there are no alternatives,
>>> including but not limited to login from keytab, proper TGT cache handling,
>>> passing tokens to Hadoop services like HDFS, HBase, Hive, etc.
>>> * The partial win is that the whole delegation token framework is going to
>>> be initiated if hadoop-common is on classpath (Hadoop is optional in core
>>> libraries)
>>> The possibility to eliminate Credentials from API could be:
>>> * to convert Credentials to byte array forth and back while a provider
>>> gives back token(s): I think this would be an overkill and would make the
>>> API less clear what to give back what Manager understands
>>> * to re-implement Credentials internal structure in a POJO, here the same
>>> convert forth and back would happen between provider and manager. I think
>>> this case would be the re-invent the wheel scenario
>>>
>>>
>>>    1. What does the TaskManager do with the received token?
>>>
>>> Puts the tokens into the UserGroupInformation instance for the current
>>>
>>> user. Such way Hadoop compatible services can pick up the tokens from there
>>> properly.
>>> This is an existing pattern inside Spark.
>>>
>>>
>>>    1. Is there any functionality in the TaskManager that could require a
>>>    token on startup (i.e., before registering with the RM)?
>>>
>>> Never seen such functionality in Spark and after analysis not seen in
>>>
>>> Flink too. If you have something in mind which I've missed plz help me out.
>>>
>>>
>>>
>>> On 11/01/2022 14:58, Gabor Somogyi wrote:
>>>
>>> Hi All,
>>>
>>> Hope all of you have enjoyed the holiday season.
>>>
>>> I would like to start the discussion on 
>>> FLIP-211<https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework>
>>>  
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework>
>>>  
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework>
>>>  
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework>
>>> which
>>> aims to provide a
>>> Kerberos delegation token framework that /obtains/renews/distributes tokens
>>> out-of-the-box.
>>>
>>> Please be aware that the FLIP wiki area is not fully done since the
>>> discussion may
>>> change the feature in major ways. The proposal can be found in a google doc
>>> here<https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ>
>>>  
>>> <https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ>
>>>  
>>> <https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ>
>>>  
>>> <https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ>
>>> .
>>> As the community agrees on the approach the content will be moved to the
>>> wiki page.
>>>
>>> Feel free to add your thoughts to make this feature better!
>>>
>>> BR,
>>> G
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to