I don't have a good alternative solution but it sounds to me a bit as if we
are trying to solve Kerberos' scalability problems within Flink. And even
if we do it like this, there is no guarantee that it works because there
can be other applications bombing the KDC with requests. From a
maintainability and separation of concerns perspective I'd rather have this
as some kind of external tool/service that makes KDC scale better and that
Flink processes can talk to to obtain the tokens.
Cheers,
Till
On Thu, Feb 3, 2022 at 6:01 PM Gabor Somogyi<gabor.g.somo...@gmail.com>
wrote:
Oh and the most important reason I've forgotten.
Without the feature in the FLIP all secure workloads with delegation
tokens
are going to stop when tokens are reaching it's max lifetime 🙂
This is around 7 days with default config...
On Thu, Feb 3, 2022 at 5:30 PM Gabor Somogyi<gabor.g.somo...@gmail.com>
wrote:
That's not the single purpose of the feature but in some environments
it
caused problems.
The main intention is not to deploy keytab to all the nodes because the
attack surface is bigger + reduce the KDC load.
I've already described the situation previously in this thread so
copying
it here.
--------COPY--------
"KDC *may* collapse under some circumstances" is the proper wording.
We have several customers who are executing workloads on Spark/Flink.
Most
of the time I'm facing their
daily issues which is heavily environment and use-case dependent. I've
seen various cases:
* where the mentioned ~1k nodes were working fine
* where KDC thought the number of requests are coming from DDOS attack
so
discontinued authentication
* where KDC was simply not responding because of the load
* where KDC was intermittently had some outage (this was the most nasty
thing)
Since you're managing relatively big cluster then you know that KDC is
not
only used by Spark/Flink workloads
but the whole company IT infrastructure is bombing it so it really
depends
on other factors too whether KDC is reaching
it's limit or not. Not sure what kind of evidence are you looking for
but
I'm not authorized to share any information about
our clients data.
One thing is for sure. The more external system types are used in
workloads (for ex. HDFS, HBase, Hive, Kafka) which
are authenticating through KDC the more possibility to reach this
threshold when the cluster is big enough.
--------COPY--------
The FLIP mentions scaling issues with 200 nodes; it's really
surprising
to me that such a small number of requests can already cause issues.
One node/task doesn't mean 1 request. The following type of kerberos
auth
types has been seen by me which can run at the same time:
HDFS, Hbase, Hive, Kafka, all DBs (oracle, mariaDB, etc...)
Additionally
one task is not necessarily opens 1 connection.
All in all I don't have steps to reproduce but we've faced this
already...
G
On Thu, Feb 3, 2022 at 5:15 PM Chesnay Schepler<ches...@apache.org>
wrote:
What I don't understand is how this could overload the KDC. Aren't
tokens valid for a relatively long time period?
For new deployments where many TMs are started at once I could imagine
it temporarily, but shouldn't the accesses to the KDC eventually
naturally spread out?
The FLIP mentions scaling issues with 200 nodes; it's really
surprising
to me that such a small number of requests can already cause issues.
On 03/02/2022 16:14, Gabor Somogyi wrote:
I would prefer not choosing the first option
Then the second option may play only.
I am not a Kerberos expert but is it really so that every
application
that
wants to use Kerberos needs to implement the token propagation
itself?
This
somehow feels as if there is something missing.
OK, so first some kerberos + token intro.
Some basics:
* TGT can be created from keytab
* TGT is needed to obtain TGS (called token)
* Authentication only works with TGS -> all places where external
system is
needed either a TGT or TGS needed
There are basically 2 ways to authenticate to a kerberos secured
external
system:
1. One needs a kerberos TGT which MUST be propagated to all JVMs.
Here
each
and every JVM obtains a TGS by itself which bombs the KDC that may
collapse.
2. One needs a kerberos TGT which exists only on a single place (in
this
case JM). JM gets a TGS which MUST be propagated to all TMs because
otherwise authentication fails.
Now the whole system works in a way that keytab file (we can imagine
that
as plaintext password) is reachable on all nodes.
This is a relatively huge attack surface. Now the main intention is:
* Instead of propagating keytab file to all nodes propagate a TGS
which
has
limited lifetime (more secure)
* Do the TGS generation in a single place so KDC may not collapse +
having
keytab only on a single node can be better protected
As a final conclusion if there is a place which expects to do
kerberos
authentication then it's a MUST to have either TGT or TGS.
Now it's done in a pretty unsecure way. The questions are the
following:
* Do we want to leave this unsecure keytab propagation like this and
bomb
KDC?
* If no then how do we propagate the more secure token to TMs.
If the answer to the first question is no then the FLIP can be
abandoned
and doesn't worth the further effort.
If the answer is yes then we can talk about the how part.
G
On Thu, Feb 3, 2022 at 3:42 PM Till Rohrmann<trohrm...@apache.org>
wrote:
I would prefer not choosing the first option
Make the TM accept tasks only after registration(not sure if it's
possible or makes sense at all)
because it effectively means that we change how Flink's component
lifecycle
works for distributing Kerberos tokens. It also effectively means
that
a TM
cannot make progress until connected to a RM.
I am not a Kerberos expert but is it really so that every
application
that
wants to use Kerberos needs to implement the token propagation
itself?
This
somehow feels as if there is something missing.
Cheers,
Till
On Thu, Feb 3, 2022 at 3:29 PM Gabor Somogyi <
gabor.g.somo...@gmail.com>
wrote:
Isn't this something the underlying resource management system
could
do
or which every process could do on its own?
I was looking for such feature but not found.
Maybe we can solve the propagation easier but then I'm waiting on
better
suggestion.
If anybody has better/more simple idea then please point to a
specific
feature which works on all resource management systems.
Here's an example for the TM to run workloads without being
connected
to the RM, without ever having a valid token
All in all I see the main problem. Not sure what is the reason
behind
that
a TM accepts tasks w/o registration but clearly not helping here.
I basically see 2 possible solutions:
* Make the TM accept tasks only after registration(not sure if
it's
possible or makes sense at all)
* We send tokens right after container creation with
"updateDelegationTokens"
Not sure which one is more realistic to do since I'm not involved
the
new
feature.
WDYT?
On Thu, Feb 3, 2022 at 3:09 PM Till Rohrmann <
trohrm...@apache.org>
wrote:
Hi everyone,
Sorry for joining this discussion late. I also did not read all
responses
in this thread so my question might already be answered: Why does
Flink
need to be involved in the propagation of the tokens? Why do we
need
explicit RPC calls in the Flink domain? Isn't this something the
underlying
resource management system could do or which every process could
do
on
its
own? I am a bit worried that we are making Flink responsible for
something
that it is not really designed to do so.
Cheers,
Till
On Thu, Feb 3, 2022 at 2:54 PM Chesnay Schepler <
ches...@apache.org>
wrote:
Here's an example for the TM to run workloads without being
connected
to
the RM, while potentially having a valid token:
1. TM registers at RM
2. JobMaster requests slot from RM -> TM gets notified
3. JM fails over
4. TM re-offers the slot to the failed over JobMaster
5. TM reconnects to RM at some point
Here's an example for the TM to run workloads without being
connected
to
the RM, without ever having a valid token:
1. TM1 has a valid token and is running some tasks.
2. TM1 crashes
3. TM2 is started to take over, and re-uses the working
directory
of
TM1 (new feature in 1.15!)
4. TM2 recovers the previous slot allocations
5. TM2 is informed about leading JM
6. TM2 starts registration with RM
7. TM2 offers slots to JobMaster
8. TM2 accepts task submission from JobMaster
9. ...some time later the registration completes...
On 03/02/2022 14:24, Gabor Somogyi wrote:
but it can happen that the JobMaster+TM collaborate to run
stuff
without the TM being registered at the RM
Honestly I'm not educated enough within Flink to give an
example
to
such scenario.
Until now I thought JM defines tasks to be done and TM just
blindly
connects to external systems and does the processing.
All in all if external systems can be touched when JM + TM
collaboration happens then we need to consider that in the
design.
Since I don't have an example scenario I don't know what
exactly
needs
to be solved.
I think we need an example case to decide whether we face a
real
issue
or the design is not leaking.
On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler <
ches...@apache.org>
wrote:
> Just to learn something new. I think local recovery is
clear to
me which is not touching external systems like Kafka or so
(correct me if I'm wrong). Is it possible that such case
the
user
code just starts to run blindly w/o JM coordination and
connects
to external systems to do data processing?
Local recovery itself shouldn't touch external systems;
the
TM
cannot just run user-code without the JobMaster being
involved,
but it can happen that the JobMaster+TM collaborate to run
stuff
without the TM being registered at the RM.
On 03/02/2022 13:48, Gabor Somogyi wrote:
> Any error in loading the provider (be it by accident or
explicit checks) then is a setup error and we can fail
the
cluster.
Fail fast is a good direction in my view. In Spark I
wanted
to
go
to this direction but there were other opinions so there
if a
provider is not loaded then the workload goes further.
Of course the processing will fail if the token is
missing...
> Requiring HBase (and Hadoop for that matter) to be on
the
JM
system classpath would be a bit unfortunate. Have you
considered
loading the providers as plugins?
Even if it's unfortunate the actual implementation is
depending
on that already. Moving HBase and/or all token providers
into
plugins is a possibility.
That way if one wants to use a specific provider then a
plugin
need to be added. If we would like to go to this
direction
I
would do that in a separate
FLIP not to have feature creep here. The actual FLIP
already
covers several thousand lines of code changes.
> This is missing from the FLIP. From my experience with
the
metric reporters, having the implementation rely on the
configuration is really annoying for testing purposes.
That's
why
I suggested factories; they can take care of extracting
all
parameters that the implementation needs, and then pass
them
nicely via the constructor.
ServiceLoader provided services must have a norarg
constructor
where no parameters can be passed.
As a side note testing delegation token providers is pain
in
the
ass and not possible with automated tests without
creating
a
fully featured kerberos cluster with KDC, HDFS, HBase,
Kafka,
etc..
We've had several tries in Spark but then gave it up
because
of
the complexity and the flakyness of it so I wouldn't care
much
about unit testing.
The sad truth is that most of the token providers can be
tested
manually on cluster.
Of course this doesn't mean that the whole code is not
intended
to be covered with tests. I mean couple of parts can be
automatically tested but providers are not such.
> This also implies that any fields of the provider
wouldn't
inherently have to be mutable.
I think this is not an issue. A provider connects to a
service,
obtains token(s) and then close the connection and never
seen
the
need of an intermediate state.
I've just mentioned the singleton behavior to be clear.
> One examples is a TM restart + local recovery, where
the
TM
eagerly offers the previous set of slots to the leading
JM.
Just to learn something new. I think local recovery is
clear
to
me which is not touching external systems like Kafka or
so
(correct me if I'm wrong).
Is it possible that such case the user code just starts
to
run
blindly w/o JM coordination and connects to external
systems
to
do data processing?
On Thu, Feb 3, 2022 at 1:09 PM Chesnay Schepler
<ches...@apache.org> wrote:
1)
The manager certainly shouldn't check for specific
implementations.
The problem with classpath-based checks is it can
easily
happen that the provider can't be loaded in the first
place
(e.g., if you don't use reflection, which you
currently
kinda
force), and in that case Flink can't tell whether the
token
is not required or the cluster isn't set up
correctly.
As I see it we shouldn't try to be clever; if the
users
wants
kerberos, then have him enable the providers. Any
error
in
loading the provider (be it by accident or explicit
checks)
then is a setup error and we can fail the cluster.
If we still want to auto-detect whether the provider
should
be used, note that using factories would make this
easier;
the factory can check the classpath (not having any
direct
dependencies on HBase avoids the case above), and the
provider no longer needs reflection because it will
only
be
used iff HBase is on the CP.
Requiring HBase (and Hadoop for that matter) to be on
the JM
system classpath would be a bit unfortunate. Have you
considered loading the providers as plugins?
2) > DelegationTokenProvider#init method
This is missing from the FLIP. From my experience
with
the
metric reporters, having the implementation rely on
the
configuration is really annoying for testing
purposes.
That's
why I suggested factories; they can take care of
extracting
all parameters that the implementation needs, and
then
pass
them nicely via the constructor. This also implies
that
any
fields of the provider wouldn't inherently have to be
mutable.
> workloads are not yet running until the initial
token
set
is not propagated.
This isn't necessarily true. It can happen that tasks
are
being deployed to the TM without it having registered
with
the RM; there is currently no requirement that a TM
must
be
registered before it may offer slots / accept task
submissions.
One examples is a TM restart + local recovery, where
the
TM
eagerly offers the previous set of slots to the
leading
JM.
On 03/02/2022 12:39, Gabor Somogyi wrote:
Thanks for the quick response!
Appreciate your invested time...
G
On Thu, Feb 3, 2022 at 11:12 AM Chesnay Schepler
<ches...@apache.org> wrote:
Thanks for answering the questions!
1) Does the HBase provider require HBase to be
on
the
classpath?
To be instantiated no, to obtain a token yes.
If so, then could it even be loaded if Hbase
is
on
the classpath?
The provider can be loaded but inside the provider
it
would
detect whether HBase is on classpath.
Just to be crystal clear here this is the actual
implementation what I would like to take over into
the
Provider.
Please see:
https://github.com/apache/flink/blob/e6210d40491ff28c779b8604e425f01983f8a3d7/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L243-L254
I've considered to load only the necessary Providers
but
that would mean a generic Manager need to know that
if
the
newly loaded Provider is
instanceof HBaseDelegationTokenProvider, then it
need
to be
skipped.
I think it would add unnecessary complexity to the
Manager
and it would contain ugly code parts(at least in my
view
ugly), like this
if (provider instanceof HBaseDelegationTokenProvider
&&
hbaseIsNotOnClasspath()) {
// Skip intentionally
} else if (provider instanceof
SomethingElseDelegationTokenProvider &&
somethingElseIsNotOnClasspath()) {
// Skip intentionally
} else {
providers.put(provider.serviceName(), provider);
}
I think the least code and most clear approach is to
load
the providers and decide inside whether everything
is
given
to obtain a token.
If not, then you're assuming the classpath
of
the
JM/TM to be the same, which isn't necessarily
true
(in
general; and also if Hbase is loaded from the
user-jar).
I'm not assuming that the classpath of JM/TM must be
the
same. If the HBase jar is coming from the user-jar
then
the
HBase code is going to use UGI within the JVM when
authentication required.
Of course I've not yet tested within Flink but in
Spark
it
is working fine.
All in all JM/TM classpath may be different but on
both
side
HBase jar must exists somehow.
2) None of the /Providers/ in your PoC get
access
to
the
configuration. Only the /Manager/ is. Note that
I
do
not
know whether there is a need for the providers
to
have
access to the config, as that's very
implementation
specific I suppose.
You're right. Since this is just a POC and I don't
have
green light I've not put too many effort for a
proper
self-review. DelegationTokenProvider#init method
must
get
Flink configuration.
The reason behind is that several further
configuration
can
be find out using that. A good example is to get
Hadoop
conf.
The rationale behind is the same just like before,
it
would
be good to create a generic Manager as possible.
To be more specific some code must load Hadoop conf
which
could be the Manager or the Provider.
If the manager does that then the generic Manager
must
be
modified all the time when something special thing
is
needed
for a new provider.
This could be super problematic when a custom
provider
is
written.
10) I'm not sure myself. It could be something
as
trivial as creating some temporary directory in
HDFS I
suppose.
I've not found of such task.YARN and K8S are not
expecting
such things from executors and workloads are not yet
running
until the initial token set is not propagated.
On 03/02/2022 10:23, Gabor Somogyi wrote:
Please see my answers inline. Hope provided
satisfying
answers to all
questions.
G
On Thu, Feb 3, 2022 at 9:17 AM Chesnay
Schepler<
ches...@apache.org> <mailto:ches...@apache.org> wrote:
I have a few question that I'd appreciate if
you
could
answer them.
1. How does the Provider know whether it
is
required or not?
All registered providers which are registered
properly
are going to be
loaded and asked to obtain tokens. Worth to
mention
every provider
has the right to decide whether it wants to
obtain
tokens or not (bool
delegationTokensRequired()). For instance if
provider
detects that
HBase is not on classpath or not configured
properly
then no tokens are
obtained from that specific provider.
You may ask how a provider is registered. Here
it
is:
The provider is on classpath + there is a
META-INF
file
which contains the
name of the provider, for example:
META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
<
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-b65ee7e64c5d2dfbb683d3569fc3e42f4b5a8052ab83d7ac21de5ab72f428e0b
<
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-b65ee7e64c5d2dfbb683d3569fc3e42f4b5a8052ab83d7ac21de5ab72f428e0b
1. How does the configuration of Providers
work
(how do they get
access to a configuration)?
Flink configuration is going to be passed to
all
providers. Please see the
POC here:
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1
Service specific configurations are loaded
on-the-fly.
For example in HBase
case it looks for HBase configuration class
which
will
be instantiated
within the provider.
1. How does a user select providers? (Is
it
purely
based on the
provider being on the classpath?)
Providers can be explicitly turned off with
the
following config:
"security.kerberos.tokens.${name}.enabled".
I've
never
seen that 2
different implementation would exist for a
specific
external service, but if this edge case would
exist
then the mentioned
config need to be added, a new provider with a
different name need to be
implemented and registered.
All in all we've seen that provider handling is
not
user specific task but
a cluster admin one. If a specific provider is
needed
then it's implemented
once per company, registered once
to the clusters and then all users may or may
not
use
the obtained tokens.
Worth to mention the system will know which
token
need
to be used when HDFS
is accessed, this part is automatic.
1. How can a user override an existing
provider?
Pease see the previous bulletpoint.
1. What is DelegationTokenProvider#name()
used
for?
By default all providers which are registered
properly
(on classpath +
META-INF entry) are on by default. With
"security.kerberos.tokens.${name}.enabled" a
specific
provider can be
turned off.
Additionally I'm intended to use this in log
entries
later on for debugging
purposes. For example "hadoopfs provider
obtained 2
tokens with ID...".
This would help what and when is happening
with tokens. The same applies to TaskManager
side:
"2
hadoopfs provider
tokens arrived with ID...". Important to note
that
the
secret part will be
hidden in the mentioned log entries to keep the
attach surface low.
1. What happens if the names of 2
providers
are
identical?
Presume you mean 2 different classes which
both
registered and having the
same logic inside. This case both will be
loaded
and
both is going to
obtain token(s) for the same service.
Both obtained token(s) are going to be added to
the
UGI. As a result the
second will overwrite the first but the order
is
not
defined. Since both
token(s) are valid no matter which one is
used then access to the external system will
work.
When the class names are same then service
loader
only
loads a single entry
because services are singletons. That's the
reason
why
state inside
providers are not advised.
1. Will we directly load the provider, or
first
load a factory
(usually preferable)?
Intended to load a provider directly by DTM.
We
can
add an extra layer to
have factory but after consideration I came to
a
conclusion that it would
be and overkill this case.
Please have a look how it's planned to load
providers
now:
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-d56a0bc77335ff23c0318f6dec1872e7b19b1a9ef6d10fff8fbaab9aecac94faR54-R81
1. What is the Credentials class (it would
necessarily have to be a
public api as well)?
Credentials class is coming from Hadoop. My
main
intention was not to bind
the implementation to Hadoop completely. It is
not
possible because of the
following reasons:
* Several functionalities are must because
there
are
no
alternatives,
including but not limited to login from keytab,
proper
TGT cache handling,
passing tokens to Hadoop services like HDFS,
HBase,
Hive, etc.
* The partial win is that the whole delegation
token
framework is going to
be initiated if hadoop-common is on classpath
(Hadoop
is optional in core
libraries)
The possibility to eliminate Credentials from
API
could
be:
* to convert Credentials to byte array forth
and
back
while a provider
gives back token(s): I think this would be an
overkill
and would make the
API less clear what to give back what Manager
understands
* to re-implement Credentials internal
structure
in a
POJO, here the same
convert forth and back would happen between
provider
and manager. I think
this case would be the re-invent the wheel
scenario
1. What does the TaskManager do with the
received
token?
Puts the tokens into the UserGroupInformation
instance
for the current
user. Such way Hadoop compatible services can
pick
up
the tokens from there
properly.
This is an existing pattern inside Spark.
1. Is there any functionality in the
TaskManager
that could require a
token on startup (i.e., before registering
with
the RM)?
Never seen such functionality in Spark and
after
analysis not seen in
Flink too. If you have something in mind which
I've
missed plz help me out.
On 11/01/2022 14:58, Gabor Somogyi wrote:
Hi All,
Hope all of you have enjoyed the holiday
season.
I would like to start the discussion on
FLIP-211<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
which
aims to provide a
Kerberos delegation token framework that
/obtains/renews/distributes tokens
out-of-the-box.
Please be aware that the FLIP wiki area is not
fully
done since the
discussion may
change the feature in major ways. The proposal
can be
found in a google doc
here<
https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
<
https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
<
https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
<
https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
.
As the community agrees on the approach the
content
will be moved to the
wiki page.
Feel free to add your thoughts to make this
feature
better!
BR,
G