> There is a separate JobMaster for each job
within a Flink cluster and each JobMaster only has a partial view of the
task managers

Good point! I've had a deeper look and you're right. We definitely need to
find another place.

> Related per-cluster or per-job keytab:

In the current code per-cluster keytab is implemented and I'm intended to
keep it like this within this FLIP. The reason is simple: tokens on TM side
can be stored within the UserGroupInformation (UGI) structure which is
global. I'm not telling it's impossible to change that but I think that
this is such a complexity which the initial implementation is not required
to contain. Additionally we've not seen such need from user side. If the
need may rise later on then another FLIP with this topic can be created and
discussed. Proper multi-UGI handling within a single JVM is a topic where
several round of deep-dive with the Hadoop/YARN guys are required.

> single DTM instance embedded with
the ResourceManager (the Flink component)

Could you point to a code where you think it could be added exactly? A
helping hand is welcome here🙂

> Then the single (initial) implementation should work with all the
deployments modes out of the box (which is not what the FLIP suggests). Is
that correct?

All deployment modes (per-job, per-app, ...) are planned to be tested and
expect to work with the initial implementation however not all deployment
targets (k8s, local, ...) are not intended to be tested. Per deployment
target new jira needs to be created where I expect small number of codes
needs to be added and relatively expensive testing effort is required.

> I've taken a look into the prototype and in the "YarnClusterDescriptor"
you're injecting a delegation token into the AM [1] (that's obtained using
the provided keytab). If I understand this correctly from previous
discussion / FLIP, this is to support log aggregation and DT has a limited
validity. How is this DT going to be renewed?

You're clever and touched a limitation which Spark has too. In short, after
DT reaches it's max lifetime then log aggregation stops. I've had several
deep-dive rounds with the YARN guys at Spark years because wanted to fill
this gap. They can't provide us any way to re-inject the newly obtained DT
so at the end I gave up this.

BR,
G


On Mon, 24 Jan 2022, 11:00 David Morávek, <d...@apache.org> wrote:

> Hi Gabor,
>
> There is actually a huge difference between JobManager (process) and
> JobMaster (job coordinator). The naming is unfortunately bit misleading
> here from historical reasons. There is a separate JobMaster for each job
> within a Flink cluster and each JobMaster only has a partial view of the
> task managers (depends on where the slots for a particular job are
> allocated). This means that you'll end up with N "DelegationTokenManagers"
> competing with each other (N = number of running jobs in the cluster).
>
> This makes me think we're mixing two abstraction levels here:
>
> a) Per-cluster delegation tokens
> - Simpler approach, it would involve a single DTM instance embedded with
> the ResourceManager (the Flink component)
> b) Per-job delegation tokens
> - More complex approach, but could be more flexible from the user side of
> things.
> - Multiple DTM instances, that are bound with the JobMaster lifecycle.
> Delegation tokens are attached with a particular slots that are executing
> the job tasks instead of the whole task manager (TM could be executing
> multiple jobs with different tokens).
> - The question is which keytab should be used for the clustering framework,
> to support log aggregation on YARN (an extra keytab, keytab that comes with
> the first job?)
>
> I think these are the things that need to be clarified in the FLIP before
> proceeding.
>
> A follow-up question for getting a better understanding where this should
> be headed: Are there any use cases where user may want to use different
> keytabs with each job, or are we fine with using a cluster-wide keytab? If
> we go with per-cluster keytabs, is it OK that all jobs submitted into this
> cluster can access it (even the future ones)? Should this be a security
> concern?
>
> Presume you though I would implement a new class with JobManager name. The
> > plan is not that.
> >
>
> I've never suggested such thing.
>
>
> > No. That said earlier DT handling is planned to be done completely in
> > Flink. DTM has a renewal thread which re-obtains tokens in the proper
> time
> > when needed.
> >
>
> Then the single (initial) implementation should work with all the
> deployments modes out of the box (which is not what the FLIP suggests). Is
> that correct?
>
> If the cluster framework, also requires delegation token for their inner
> working (this is IMO only applies to YARN), it might need an extra step
> (injecting the token into application master container).
>
> Separating the individual layers (actual Flink cluster - basically making
> this work with a standalone deployment  / "cluster framework" - support for
> YARN log aggregation) in the FLIP would be useful.
>
> Reading the linked Spark readme could be useful.
> >
>
> I've read that, but please be patient with the questions, Kerberos is not
> an easy topic to get into and I've had a very little contact with it in the
> past.
>
>
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> >
>
> I've taken a look into the prototype and in the "YarnClusterDescriptor"
> you're injecting a delegation token into the AM [1] (that's obtained using
> the provided keytab). If I understand this correctly from previous
> discussion / FLIP, this is to support log aggregation and DT has a limited
> validity. How is this DT going to be renewed?
>
> [1]
>
> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261
>
> Best,
> D.
>
> On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
> > Here is the exact class, I'm from mobile so not had a look at the exact
> > class name:
> >
> >
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> > That keeps track of TMs where the tokens can be sent to.
> >
> > > My feeling would be that we shouldn't really introduce a new component
> > with
> > a custom lifecycle, but rather we should try to incorporate this into
> > existing ones.
> >
> > Can you be more specific? Presume you though I would implement a new
> class
> > with JobManager name. The plan is not that.
> >
> > > If I understand this correctly, this means that we then push the token
> > renewal logic to YARN.
> >
> > No. That said earlier DT handling is planned to be done completely in
> > Flink. DTM has a renewal thread which re-obtains tokens in the proper
> time
> > when needed. YARN log aggregation is a totally different feature, where
> > YARN does the renewal. Log aggregation was an example why the code can't
> be
> > 100% reusable for all resource managers. Reading the linked Spark readme
> > could be useful.
> >
> > G
> >
> > On Fri, 21 Jan 2022, 21:05 David Morávek, <d...@apache.org> wrote:
> >
> > > >
> > > > JobManager is the Flink class.
> > >
> > >
> > > There is no such class in Flink. The closest thing to the JobManager
> is a
> > > ClusterEntrypoint. The cluster entrypoint spawns new RM Runner &
> > Dispatcher
> > > Runner that start participating in the leader election. Once they gain
> > > leadership they spawn the actual underlying instances of these two
> "main
> > > components".
> > >
> > > My feeling would be that we shouldn't really introduce a new component
> > with
> > > a custom lifecycle, but rather we should try to incorporate this into
> > > existing ones.
> > >
> > > My biggest concerns would be:
> > >
> > > - How would the lifecycle of the new component look like with regards
> to
> > HA
> > > setups. If we really try to decide to introduce a completely new
> > component,
> > > how should this work in case of multiple JobManager instances?
> > > - Which components does it talk to / how? For example how does the
> > > broadcast of new token to task managers (TaskManagerGateway) look like?
> > Do
> > > we simply introduce a new RPC on the ResourceManagerGateway that
> > broadcasts
> > > it or does the new component need to do some kind of bookkeeping of
> task
> > > managers that it needs to notify?
> > >
> > > YARN based HDFS log aggregation would not work by dropping that code.
> > Just
> > > > to be crystal clear, the actual implementation contains this fir
> > exactly
> > > > this reason.
> > > >
> > >
> > > This is the missing part +1. If I understand this correctly, this means
> > > that we then push the token renewal logic to YARN. How do you plan to
> > > implement the renewal logic on k8s?
> > >
> > > D.
> > >
> > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com
> > >
> > > wrote:
> > >
> > > > > I think we might both mean something different by the RM.
> > > >
> > > > You feel it well, I've not specified these terms well in the
> > explanation.
> > > > RM I meant resource management framework. JobManager is the Flink
> > class.
> > > > This means that inside JM instance there will be a DTM instance, so
> > they
> > > > would have the same lifecycle. Hope I've answered the question.
> > > >
> > > > > If we have tokens available on the client side, why do we need to
> set
> > > > them
> > > > into the AM (yarn specific concept) launch context?
> > > >
> > > > YARN based HDFS log aggregation would not work by dropping that code.
> > > Just
> > > > to be crystal clear, the actual implementation contains this fir
> > exactly
> > > > this reason.
> > > >
> > > > G
> > > >
> > > > On Fri, 21 Jan 2022, 20:12 David Morávek, <d...@apache.org> wrote:
> > > >
> > > > > Hi Gabor,
> > > > >
> > > > > 1. One thing is important, token management is planned to be done
> > > > > > generically within Flink and not scattered in RM specific code.
> > > > > JobManager
> > > > > > has a DelegationTokenManager which obtains tokens time-to-time
> (if
> > > > > > configured properly). JM knows which TaskManagers are in place so
> > it
> > > > can
> > > > > > distribute it to all TMs. That's it basically.
> > > > >
> > > > >
> > > > > I think we might both mean something different by the RM.
> JobManager
> > is
> > > > > basically just a process encapsulating multiple components, one of
> > > which
> > > > is
> > > > > a ResourceManager, which is the component that manages task manager
> > > > > registrations [1]. There is more or less a single implementation of
> > the
> > > > RM
> > > > > with plugable drivers for the active integrations (yarn, k8s).
> > > > >
> > > > > It would be great if you could share more details of how exactly
> the
> > > DTM
> > > > is
> > > > > going to fit in the current JM architecture.
> > > > >
> > > > > 2. 99.9% of the code is generic but each RM handles tokens
> > > differently. A
> > > > > > good example is YARN obtains tokens on client side and then sets
> > them
> > > > on
> > > > > > the newly created AM container launch context. This is purely
> YARN
> > > > > specific
> > > > > > and cant't be spared. With my actual plans standalone can be
> > changed
> > > to
> > > > > use
> > > > > > the framework. By using it I mean no RM specific DTM or
> whatsoever
> > is
> > > > > > needed.
> > > > > >
> > > > >
> > > > > If we have tokens available on the client side, why do we need to
> set
> > > > them
> > > > > into the AM (yarn specific concept) launch context? Why can't we
> > simply
> > > > > send them to the JM, eg. as a parameter of the job submission / via
> > > > > separate RPC call? There might be something I'm missing due to
> > limited
> > > > > knowledge, but handling the token on the "cluster framework" level
> > > > doesn't
> > > > > seem necessary.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager
> > > > >
> > > > > Best,
> > > > > D.
> > > > >
> > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor Somogyi <
> > > gabor.g.somo...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Oh and one more thing. I'm planning to add this feature in small
> > > chunk
> > > > of
> > > > > > PRs because security is super hairy area. That way reviewers can
> be
> > > > more
> > > > > > easily obtains the concept.
> > > > > >
> > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, <d...@apache.org>
> wrote:
> > > > > >
> > > > > > > Hi Gabor,
> > > > > > >
> > > > > > > thanks for drafting the FLIP, I think having a solid Kerberos
> > > support
> > > > > is
> > > > > > > crucial for many enterprise deployments.
> > > > > > >
> > > > > > > I have multiple questions regarding the implementation (note
> > that I
> > > > > have
> > > > > > > very limited knowledge of Kerberos):
> > > > > > >
> > > > > > > 1) If I understand it correctly, we'll only obtain tokens in
> the
> > > job
> > > > > > > manager and then we'll distribute them via RPC (needs to be
> > > secured).
> > > > > > >
> > > > > > > Can you please outline how the communication will look like? Is
> > the
> > > > > > > DelegationTokenManager going to be a part of the
> ResourceManager?
> > > Can
> > > > > you
> > > > > > > outline it's lifecycle / how it's going to be integrated there?
> > > > > > >
> > > > > > > 2) Do we really need a YARN / k8s specific implementations? Is
> it
> > > > > > possible
> > > > > > > to obtain / renew a token in a generic way? Maybe to rephrase
> > that,
> > > > is
> > > > > it
> > > > > > > possible to implement DelegationTokenManager for the standalone
> > > > Flink?
> > > > > If
> > > > > > > we're able to solve this point, it could be possible to target
> > all
> > > > > > > deployment scenarios with a single implementation.
> > > > > > >
> > > > > > > Best,
> > > > > > > D.
> > > > > > >
> > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan Zhang <
> > > > zuston.sha...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi G
> > > > > > > >
> > > > > > > > Thanks for your explain in detail. I have gotten your
> thoughts,
> > > and
> > > > > any
> > > > > > > > way this proposal
> > > > > > > > is a great improvement.
> > > > > > > >
> > > > > > > > Looking forward to your implementation and i will keep focus
> on
> > > it.
> > > > > > > > Thanks again.
> > > > > > > >
> > > > > > > > Best
> > > > > > > > JunFan.
> > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor Somogyi <
> > > > > > gabor.g.somo...@gmail.com
> > > > > > > >,
> > > > > > > > wrote:
> > > > > > > > > Just to confirm keeping
> > > > "security.kerberos.fetch.delegation-token"
> > > > > is
> > > > > > > > added
> > > > > > > > > to the doc.
> > > > > > > > >
> > > > > > > > > BR,
> > > > > > > > > G
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor Somogyi <
> > > > > > > gabor.g.somo...@gmail.com
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi JunFan,
> > > > > > > > > >
> > > > > > > > > > > By the way, maybe this should be added in the migration
> > > plan
> > > > or
> > > > > > > > > > intergation section in the FLIP-211.
> > > > > > > > > >
> > > > > > > > > > Going to add this soon.
> > > > > > > > > >
> > > > > > > > > > > Besides, I have a question that the KDC will collapse
> > when
> > > > the
> > > > > > > > cluster
> > > > > > > > > > reached 200 nodes you described
> > > > > > > > > > in the google doc. Do you have any attachment or
> reference
> > to
> > > > > prove
> > > > > > > it?
> > > > > > > > > >
> > > > > > > > > > "KDC *may* collapse under some circumstances" is the
> proper
> > > > > > wording.
> > > > > > > > > >
> > > > > > > > > > We have several customers who are executing workloads on
> > > > > > Spark/Flink.
> > > > > > > > Most
> > > > > > > > > > of the time I'm facing their
> > > > > > > > > > daily issues which is heavily environment and use-case
> > > > dependent.
> > > > > > > I've
> > > > > > > > > > seen various cases:
> > > > > > > > > > * where the mentioned ~1k nodes were working fine
> > > > > > > > > > * where KDC thought the number of requests are coming
> from
> > > DDOS
> > > > > > > attack
> > > > > > > > so
> > > > > > > > > > discontinued authentication
> > > > > > > > > > * where KDC was simply not responding because of the load
> > > > > > > > > > * where KDC was intermittently had some outage (this was
> > the
> > > > most
> > > > > > > nasty
> > > > > > > > > > thing)
> > > > > > > > > >
> > > > > > > > > > Since you're managing relatively big cluster then you
> know
> > > that
> > > > > KDC
> > > > > > > is
> > > > > > > > not
> > > > > > > > > > only used by Spark/Flink workloads
> > > > > > > > > > but the whole company IT infrastructure is bombing it so
> it
> > > > > really
> > > > > > > > depends
> > > > > > > > > > on other factors too whether KDC is reaching
> > > > > > > > > > it's limit or not. Not sure what kind of evidence are you
> > > > looking
> > > > > > for
> > > > > > > > but
> > > > > > > > > > I'm not authorized to share any information about
> > > > > > > > > > our clients data.
> > > > > > > > > >
> > > > > > > > > > One thing is for sure. The more external system types are
> > > used
> > > > in
> > > > > > > > > > workloads (for ex. HDFS, HBase, Hive, Kafka) which
> > > > > > > > > > are authenticating through KDC the more possibility to
> > reach
> > > > this
> > > > > > > > > > threshold when the cluster is big enough.
> > > > > > > > > >
> > > > > > > > > > All in all this feature is here to help all users never
> > reach
> > > > > this
> > > > > > > > > > limitation.
> > > > > > > > > >
> > > > > > > > > > BR,
> > > > > > > > > > G
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆 <
> > zuston.sha...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi G
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your quick reply. I think reserving the
> config
> > > of
> > > > > > > > > > > *security.kerberos.fetch.delegation-token*
> > > > > > > > > > > and simplifying disable the token fetching is a good
> > > idea.By
> > > > > the
> > > > > > > way,
> > > > > > > > > > > maybe this should be added
> > > > > > > > > > > in the migration plan or intergation section in the
> > > FLIP-211.
> > > > > > > > > > >
> > > > > > > > > > > Besides, I have a question that the KDC will collapse
> > when
> > > > the
> > > > > > > > cluster
> > > > > > > > > > > reached 200 nodes you described
> > > > > > > > > > > in the google doc. Do you have any attachment or
> > reference
> > > to
> > > > > > prove
> > > > > > > > it?
> > > > > > > > > > > Because in our internal per-cluster,
> > > > > > > > > > > the nodes reaches > 1000 and KDC looks good. Do i
> missed
> > or
> > > > > > > > misunderstood
> > > > > > > > > > > something? Please correct me.
> > > > > > > > > > >
> > > > > > > > > > > Best
> > > > > > > > > > > JunFan.
> > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800, dev@flink.apache.org,
> > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to