We've made the changes both in the doc + wiki.
Please have a look and notify me if I've missed something based on our
agreement.

G


On Fri, Jan 28, 2022 at 4:04 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Thanks for making the design better! No further thing to discuss from my
> side.
>
> Started to reflect the agreement in the FLIP doc.
> Since I don't have access to the wiki I need to ask Marci to do that which
> may take some time.
>
> G
>
>
> On Fri, Jan 28, 2022 at 3:52 PM David Morávek <d...@apache.org> wrote:
>
>> Hi,
>>
>> AFAIU an under registration TM is not added to the registered TMs map
>> until
>> > RegistrationResponse ..
>> >
>>
>> I think you're right, with a careful design around threading (delegating
>> update broadcasts to the main thread) + synchronous initial update (that
>> would be nice to avoid) this should be doable.
>>
>> Not sure what you mean "we can't register the TM without providing it with
>> > token" but in unsecure configuration registration must happen w/o
>> tokens.
>> >
>>
>> Exactly as you describe it, this was meant only for the "kerberized /
>> secured" cluster case, in other cases we wouldn't enforce a non-null token
>> in the response
>>
>> I think this is a good idea in general.
>> >
>>
>> +1
>>
>> If you don't have any more thoughts on the RPC / lifecycle part, can you
>> please reflect it into the FLIP?
>>
>> D.
>>
>> On Fri, Jan 28, 2022 at 3:16 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>> > > - Make sure DTs issued by single DTMs are monotonically increasing
>> (can
>> > be
>> > sorted on TM side)
>> >
>> > AFAIU an under registration TM is not added to the registered TMs map
>> until
>> > RegistrationResponse
>> > is processed which would contain the initial tokens. If that's true then
>> > how is it possible to have race with
>> > DTM update which is working on the registered TMs list?
>> > To be more specific "taskExecutors" is the registered map of TMs to
>> which
>> > DTM can send updated tokens
>> > but this doesn't contain the under registration TM while
>> > RegistrationResponse is not processed, right?
>> >
>> > Of course if DTM can update while RegistrationResponse is processed then
>> > somehow sorting would be
>> > required and that case I would agree.
>> >
>> > - Scope DT updates by the RM ID and ensure that TM only accepts update
>> from
>> > the current leader
>> >
>> > I've planned this initially the mentioned way so agreed.
>> >
>> > - Return initial token with the RegistrationResponse, which should make
>> the
>> > RPC contract bit clearer (ensure that we can't register the TM without
>> > providing it with token)
>> >
>> > I think this is a good idea in general. Not sure what you mean "we can't
>> > register the TM without
>> > providing it with token" but in unsecure configuration registration must
>> > happen w/o tokens.
>> > All in all the newly added tokens field must be somehow optional.
>> >
>> > G
>> >
>> >
>> > On Fri, Jan 28, 2022 at 2:22 PM David Morávek <d...@apache.org> wrote:
>> >
>> > > We had a long discussion with Chesnay about the possible edge cases
>> and
>> > it
>> > > basically boils down to the following two scenarios:
>> > >
>> > > 1) There is a possible race condition between TM registration (the
>> first
>> > DT
>> > > update) and token refresh if they happen simultaneously. Than the
>> > > registration might beat the refreshed token. This could be easily
>> > addressed
>> > > if DTs could be sorted (eg. by the expiration time) on the TM side. In
>> > > other words, if there are multiple updates at the same time we need to
>> > make
>> > > sure that we have a deterministic way of choosing the latest one.
>> > >
>> > > One idea by Chesnay that popped up during this discussion was whether
>> we
>> > > could simply return the initial token with the RegistrationResponse to
>> > > avoid making an extra call during the TM registration.
>> > >
>> > > 2) When the RM leadership changes (eg. because zookeeper session times
>> > out)
>> > > there might be a race condition where the old RM is shutting down and
>> > > updates the tokens, that it might again beat the registration token of
>> > the
>> > > new RM. This could be avoided if we scope the token by
>> > _ResourceManagerId_
>> > > and only accept updates for the current leader (basically we'd have an
>> > > extra parameter to the _updateDelegationToken_ method).
>> > >
>> > > -
>> > >
>> > > DTM is way simpler then for example slot management, which could
>> receive
>> > > updates from the JobMaster that RM might not know about.
>> > >
>> > > So if you want to go in the path you're describing it should be doable
>> > and
>> > > we'd propose following to cover all cases:
>> > >
>> > > - Make sure DTs issued by single DTMs are monotonically increasing
>> (can
>> > be
>> > > sorted on TM side)
>> > > - Scope DT updates by the RM ID and ensure that TM only accepts update
>> > from
>> > > the current leader
>> > > - Return initial token with the RegistrationResponse, which should
>> make
>> > the
>> > > RPC contract bit clearer (ensure that we can't register the TM without
>> > > providing it with token)
>> > >
>> > > Any thoughts?
>> > >
>> > >
>> > > On Fri, Jan 28, 2022 at 10:53 AM Gabor Somogyi <
>> > gabor.g.somo...@gmail.com>
>> > > wrote:
>> > >
>> > > > Thanks for investing your time!
>> > > >
>> > > > The first 2 bulletpoint are clear.
>> > > > If there is a chance that a TM can go to an inconsistent state then
>> I
>> > > agree
>> > > > with the 3rd bulletpoint.
>> > > > Just before we agree on that I would like to learn something new and
>> > > > understand how is it possible that a TM
>> > > > gets corrupted? (In Spark I've never seen such thing and no
>> mechanism
>> > to
>> > > > fix this but Flink is definitely not Spark)
>> > > >
>> > > > Here is my understanding:
>> > > > * DTM pushes new obtained DTs to TMs and if any exception occurs
>> then a
>> > > > retry after "security.kerberos.tokens.retry-wait"
>> > > > happens. This means DTM retries until it's not possible to send new
>> DTs
>> > > to
>> > > > all registered TMs.
>> > > > * New TM registration must fail if "updateDelegationToken" fails
>> > > > * "updateDelegationToken" fails consistently like a DB (at least I
>> plan
>> > > to
>> > > > implement it that way).
>> > > > If DTs are arriving on the TM side then a single
>> > > > "UserGroupInformation.getCurrentUser.addCredentials"
>> > > > will be called which I've never seen it failed.
>> > > > * I hope all other code parts are not touching existing DTs within
>> the
>> > > JVM
>> > > >
>> > > > I would like to emphasize I'm not against to add it just want to see
>> > what
>> > > > kind of problems are we facing.
>> > > > It would ease to catch bugs earlier and help in the maintenance.
>> > > >
>> > > > All in all I would buy the idea to add the 3rd bullet if we foresee
>> the
>> > > > need.
>> > > >
>> > > > G
>> > > >
>> > > >
>> > > > On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org>
>> > wrote:
>> > > >
>> > > > > Hi Gabor,
>> > > > >
>> > > > > This is definitely headed in a right direction +1.
>> > > > >
>> > > > > I think we still need to have a safeguard in case some of the TMs
>> > gets
>> > > > into
>> > > > > the inconsistent state though, which will also eliminate the need
>> for
>> > > > > implementing a custom retry mechanism (when
>> _updateDelegationToken_
>> > > call
>> > > > > fails for some reason).
>> > > > >
>> > > > > We already have this safeguard in place for slot pool (in case
>> there
>> > > are
>> > > > > some slots in inconsistent state - eg. we haven't freed them for
>> some
>> > > > > reason) and for the partition tracker, which could be simply
>> > enhanced.
>> > > > This
>> > > > > is done via periodic heartbeat from TaskManagers to the
>> > ResourceManager
>> > > > > that contains report about state of these two components (from TM
>> > > > > perspective) so the RM can reconcile their state if necessary.
>> > > > >
>> > > > > I don't think adding an additional field to
>> > > > _TaskExecutorHeartbeatPayload_
>> > > > > should be a concern as we only heartbeat every ~ 10s by default
>> and
>> > the
>> > > > new
>> > > > > field would be small compared to rest of the existing payload.
>> Also
>> > > > > heartbeat doesn't need to contain the whole DT, but just some
>> > > identifier
>> > > > > which signals whether it uses the right one, that could be
>> > > significantly
>> > > > > smaller.
>> > > > >
>> > > > > This is still a PUSH based approach as the RM would again call the
>> > > newly
>> > > > > introduced _updateDelegationToken_ when it encounters
>> inconsistency
>> > > (eg.
>> > > > > due to a temporary network partition / a race condition we didn't
>> > test
>> > > > for
>> > > > > / some other scenario we didn't think about). In practice these
>> > > > > inconsistencies are super hard to avoid and reason about (and
>> > > > unfortunately
>> > > > > yes, we see them happen from time to time), so reusing the
>> existing
>> > > > > mechanism that is designed for this exact problem simplify things.
>> > > > >
>> > > > > To sum this up we'd have three code paths for calling
>> > > > > _updateDelegationToken_:
>> > > > > 1) When the TM registers, we push the token (if DTM already has
>> it)
>> > to
>> > > it
>> > > > > 2) When DTM obtains a new token it broadcasts it to all currently
>> > > > connected
>> > > > > TMs
>> > > > > 3) When a TM gets out of sync, DTM would reconcile it's state
>> > > > >
>> > > > > WDYT?
>> > > > >
>> > > > > Best,
>> > > > > D.
>> > > > >
>> > > > >
>> > > > > On Wed, Jan 26, 2022 at 9:03 PM David Morávek <d...@apache.org>
>> > wrote:
>> > > > >
>> > > > > > Thanks the update, I'll go over it tomorrow.
>> > > > > >
>> > > > > > On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi <
>> > > > gabor.g.somo...@gmail.com
>> > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Hi All,
>> > > > > >>
>> > > > > >> Since it has turned out that DTM can't be added as member of
>> > > JobMaster
>> > > > > >> <
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
>> > > > > >> >
>> > > > > >> I've
>> > > > > >> came up with a better proposal.
>> > > > > >> David, thanks for pinpointing this out, you've caught a bug in
>> the
>> > > > early
>> > > > > >> phase!
>> > > > > >>
>> > > > > >> Namely ResourceManager
>> > > > > >> <
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
>> > > > > >> >
>> > > > > >> is
>> > > > > >> a single instance class where DTM can be added as member
>> variable.
>> > > > > >> It has a list of all already registered TMs and new TM
>> > registration
>> > > is
>> > > > > >> also
>> > > > > >> happening here.
>> > > > > >> The following can be added from logic perspective to be more
>> > > specific:
>> > > > > >> * Create new DTM instance in ResourceManager
>> > > > > >> <
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
>> > > > > >> >
>> > > > > >> and
>> > > > > >> start it (re-occurring thread to obtain new tokens)
>> > > > > >> * Add a new function named "updateDelegationTokens" to
>> > > > > TaskExecutorGateway
>> > > > > >> <
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54
>> > > > > >> >
>> > > > > >> * Call "updateDelegationTokens" on all registered TMs to
>> propagate
>> > > new
>> > > > > DTs
>> > > > > >> * In case of new TM registration call "updateDelegationTokens"
>> > > before
>> > > > > >> registration succeeds to setup new TM properly
>> > > > > >>
>> > > > > >> This way:
>> > > > > >> * only a single DTM would live within a cluster which is the
>> > > expected
>> > > > > >> behavior
>> > > > > >> * DTM is going to be added to a central place where all
>> deployment
>> > > > > target
>> > > > > >> can make use of it
>> > > > > >> * DTs are going to be pushed to TMs which would generate less
>> > > network
>> > > > > >> traffic than pull based approach
>> > > > > >> (please see my previous mail where I've described both
>> approaches)
>> > > > > >> * HA scenario is going to be consistent because such
>> > > > > >> <
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069
>> > > > > >> >
>> > > > > >> a solution can be added to "updateDelegationTokens"
>> > > > > >>
>> > > > > >> @David or all others plz share whether you agree on this or you
>> > have
>> > > > > >> better
>> > > > > >> idea/suggestion.
>> > > > > >>
>> > > > > >> BR,
>> > > > > >> G
>> > > > > >>
>> > > > > >>
>> > > > > >> On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi <
>> > > > > gabor.g.somo...@gmail.com
>> > > > > >> >
>> > > > > >> wrote:
>> > > > > >>
>> > > > > >> > First of all thanks for investing your time and helping me
>> out.
>> > > As I
>> > > > > see
>> > > > > >> > you have pretty solid knowledge in the RPC area.
>> > > > > >> > I would like to rely on your knowledge since I'm learning
>> this
>> > > part.
>> > > > > >> >
>> > > > > >> > > - Do we need to introduce a new RPC method or can we for
>> > example
>> > > > > >> > piggyback
>> > > > > >> > on heartbeats?
>> > > > > >> >
>> > > > > >> > I'm fine with either solution but one thing is important
>> > > > conceptually.
>> > > > > >> > There are fundamentally 2 ways how tokens can be updated:
>> > > > > >> > - Push way: When there are new DTs then JM JVM pushes DTs to
>> TM
>> > > > JVMs.
>> > > > > >> This
>> > > > > >> > is the preferred one since tiny amount of control logic
>> needed.
>> > > > > >> > - Pull way: Each time a TM would like to poll JM whether
>> there
>> > are
>> > > > new
>> > > > > >> > tokens and each TM wants to decide alone whether DTs needs
>> to be
>> > > > > >> updated or
>> > > > > >> > not.
>> > > > > >> > As you've mentioned here some ID needs to be generated, it
>> would
>> > > > > >> generated
>> > > > > >> > quite some additional network traffic which can be definitely
>> > > > avoided.
>> > > > > >> > As a final thought in Spark we've had this way of DT
>> propagation
>> > > > logic
>> > > > > >> and
>> > > > > >> > we've had major issues with it.
>> > > > > >> >
>> > > > > >> > So all in all DTM needs to obtain new tokens and there must a
>> > way
>> > > to
>> > > > > >> send
>> > > > > >> > this data to all TMs from JM.
>> > > > > >> >
>> > > > > >> > > - What delivery semantics are we looking for? (what if
>> we're
>> > > only
>> > > > > >> able to
>> > > > > >> > update subset of TMs / what happens if we exhaust retries /
>> > should
>> > > > we
>> > > > > >> even
>> > > > > >> > have the retry mechanism whatsoever) - I have a feeling that
>> > > somehow
>> > > > > >> > leveraging the existing heartbeat mechanism could help to
>> answer
>> > > > these
>> > > > > >> > questions
>> > > > > >> >
>> > > > > >> > Let's go through these questions one by one.
>> > > > > >> > > What delivery semantics are we looking for?
>> > > > > >> >
>> > > > > >> > DTM must receive an exception when at least one TM was not
>> able
>> > to
>> > > > get
>> > > > > >> DTs.
>> > > > > >> >
>> > > > > >> > > what if we're only able to update subset of TMs?
>> > > > > >> >
>> > > > > >> > Such case DTM will reschedule token obtain after
>> > > > > >> > "security.kerberos.tokens.retry-wait" time.
>> > > > > >> >
>> > > > > >> > > what happens if we exhaust retries?
>> > > > > >> >
>> > > > > >> > There is no number of retries. In default configuration
>> tokens
>> > > needs
>> > > > > to
>> > > > > >> be
>> > > > > >> > re-obtained after one day.
>> > > > > >> > DTM tries to obtain new tokens after 1day * 0.75
>> > > > > >> > (security.kerberos.tokens.renewal-ratio) = 18 hours.
>> > > > > >> > When fails it retries after
>> > "security.kerberos.tokens.retry-wait"
>> > > > > which
>> > > > > >> is
>> > > > > >> > 1 hour by default.
>> > > > > >> > If it never succeeds then authentication error is going to
>> > happen
>> > > on
>> > > > > the
>> > > > > >> > TM side and the workload is
>> > > > > >> > going to stop.
>> > > > > >> >
>> > > > > >> > > should we even have the retry mechanism whatsoever?
>> > > > > >> >
>> > > > > >> > Yes, because there are always temporary cluster issues.
>> > > > > >> >
>> > > > > >> > > What does it mean for the running application (how does
>> this
>> > > look
>> > > > > like
>> > > > > >> > from
>> > > > > >> > the user perspective)? As far as I remember the logs are only
>> > > > > collected
>> > > > > >> > ("aggregated") after the container is stopped, is that
>> correct?
>> > > > > >> >
>> > > > > >> > With default config it works like that but it can be forced
>> to
>> > > > > aggregate
>> > > > > >> > at specific intervals.
>> > > > > >> > A useful feature is forcing YARN to aggregate logs while the
>> job
>> > > is
>> > > > > >> still
>> > > > > >> > running.
>> > > > > >> > For long-running jobs such as streaming jobs, this is
>> > invaluable.
>> > > To
>> > > > > do
>> > > > > >> > this,
>> > > > > >> >
>> > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds
>> > > > must
>> > > > > >> be
>> > > > > >> > set to a non-negative value.
>> > > > > >> > When this is set, a timer will be set for the given duration,
>> > and
>> > > > > >> whenever
>> > > > > >> > that timer goes off,
>> > > > > >> > log aggregation will run on new files.
>> > > > > >> >
>> > > > > >> > > I think
>> > > > > >> > this topic should get its own section in the FLIP (having
>> some
>> > > cross
>> > > > > >> > reference to YARN ticket would be really useful, but I'm not
>> > sure
>> > > if
>> > > > > >> there
>> > > > > >> > are any).
>> > > > > >> >
>> > > > > >> > I think this is important knowledge but this FLIP is not
>> > touching
>> > > > the
>> > > > > >> > already existing behavior.
>> > > > > >> > DTs are set on the AM container which is renewed by YARN
>> until
>> > > it's
>> > > > > not
>> > > > > >> > possible anymore.
>> > > > > >> > Any kind of new code is not going to change this limitation.
>> > BTW,
>> > > > > there
>> > > > > >> is
>> > > > > >> > no jira for this.
>> > > > > >> > If you think it worth to write this down then I think the
>> good
>> > > place
>> > > > > is
>> > > > > >> > the official security doc
>> > > > > >> > area as caveat.
>> > > > > >> >
>> > > > > >> > > If we split the FLIP into two parts / sections that I've
>> > > > suggested,
>> > > > > I
>> > > > > >> > don't
>> > > > > >> > really think that you need to explicitly test for each
>> > deployment
>> > > > > >> scenario
>> > > > > >> > / cluster framework, because the DTM part is completely
>> > > independent
>> > > > of
>> > > > > >> the
>> > > > > >> > deployment target. Basically this is what I'm aiming for with
>> > > > "making
>> > > > > it
>> > > > > >> > work with the standalone" (as simple as starting a new java
>> > > process)
>> > > > > >> Flink
>> > > > > >> > first (which is also how most people deploy streaming
>> > application
>> > > on
>> > > > > k8s
>> > > > > >> > and the direction we're pushing forward with the
>> auto-scaling /
>> > > > > reactive
>> > > > > >> > mode initiatives).
>> > > > > >> >
>> > > > > >> > I see your point and agree the main direction. k8s is the
>> > > megatrend
>> > > > > >> which
>> > > > > >> > most of the peoples
>> > > > > >> > will use sooner or later. Not 100% sure what kind of split
>> you
>> > > > suggest
>> > > > > >> but
>> > > > > >> > in my view
>> > > > > >> > the main target is to add this feature and I'm open to any
>> > logical
>> > > > > work
>> > > > > >> > ordering.
>> > > > > >> > Please share the specific details and we work it out...
>> > > > > >> >
>> > > > > >> > G
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > On Mon, Jan 24, 2022 at 3:04 PM David Morávek <
>> d...@apache.org>
>> > > > > wrote:
>> > > > > >> >
>> > > > > >> >> >
>> > > > > >> >> > Could you point to a code where you think it could be
>> added
>> > > > > exactly?
>> > > > > >> A
>> > > > > >> >> > helping hand is welcome here 🙂
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >> >> I think you can take a look at
>> > _ResourceManagerPartitionTracker_
>> > > > [1]
>> > > > > >> which
>> > > > > >> >> seems to have somewhat similar properties to the DTM.
>> > > > > >> >>
>> > > > > >> >> One topic that needs to be addressed there is how the RPC
>> with
>> > > the
>> > > > > >> >> _TaskExecutorGateway_ should look like.
>> > > > > >> >> - Do we need to introduce a new RPC method or can we for
>> > example
>> > > > > >> piggyback
>> > > > > >> >> on heartbeats?
>> > > > > >> >> - What delivery semantics are we looking for? (what if we're
>> > only
>> > > > > able
>> > > > > >> to
>> > > > > >> >> update subset of TMs / what happens if we exhaust retries /
>> > > should
>> > > > we
>> > > > > >> even
>> > > > > >> >> have the retry mechanism whatsoever) - I have a feeling that
>> > > > somehow
>> > > > > >> >> leveraging the existing heartbeat mechanism could help to
>> > answer
>> > > > > these
>> > > > > >> >> questions
>> > > > > >> >>
>> > > > > >> >> In short, after DT reaches it's max lifetime then log
>> > aggregation
>> > > > > stops
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >> >> What does it mean for the running application (how does this
>> > look
>> > > > > like
>> > > > > >> >> from
>> > > > > >> >> the user perspective)? As far as I remember the logs are
>> only
>> > > > > collected
>> > > > > >> >> ("aggregated") after the container is stopped, is that
>> > correct? I
>> > > > > think
>> > > > > >> >> this topic should get its own section in the FLIP (having
>> some
>> > > > cross
>> > > > > >> >> reference to YARN ticket would be really useful, but I'm not
>> > sure
>> > > > if
>> > > > > >> there
>> > > > > >> >> are any).
>> > > > > >> >>
>> > > > > >> >> All deployment modes (per-job, per-app, ...) are planned to
>> be
>> > > > tested
>> > > > > >> and
>> > > > > >> >> > expect to work with the initial implementation however not
>> > all
>> > > > > >> >> deployment
>> > > > > >> >> > targets (k8s, local, ...
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >> >> If we split the FLIP into two parts / sections that I've
>> > > > suggested, I
>> > > > > >> >> don't
>> > > > > >> >> really think that you need to explicitly test for each
>> > deployment
>> > > > > >> scenario
>> > > > > >> >> / cluster framework, because the DTM part is completely
>> > > independent
>> > > > > of
>> > > > > >> the
>> > > > > >> >> deployment target. Basically this is what I'm aiming for
>> with
>> > > > "making
>> > > > > >> it
>> > > > > >> >> work with the standalone" (as simple as starting a new java
>> > > > process)
>> > > > > >> Flink
>> > > > > >> >> first (which is also how most people deploy streaming
>> > application
>> > > > on
>> > > > > >> k8s
>> > > > > >> >> and the direction we're pushing forward with the
>> auto-scaling /
>> > > > > >> reactive
>> > > > > >> >> mode initiatives).
>> > > > > >> >>
>> > > > > >> >> The whole integration with YARN (let's forget about log
>> > > aggregation
>> > > > > >> for a
>> > > > > >> >> moment) / k8s-native only boils down to how do we make the
>> > keytab
>> > > > > file
>> > > > > >> >> local to the JobManager so the DTM can read it, so it's
>> > basically
>> > > > > >> built on
>> > > > > >> >> top of that. The only special thing that needs to be tested
>> > there
>> > > > is
>> > > > > >> the
>> > > > > >> >> "keytab distribution" code path.
>> > > > > >> >>
>> > > > > >> >> [1]
>> > > > > >> >>
>> > > > > >> >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
>> > > > > >> >>
>> > > > > >> >> Best,
>> > > > > >> >> D.
>> > > > > >> >>
>> > > > > >> >> On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi <
>> > > > > >> gabor.g.somo...@gmail.com
>> > > > > >> >> >
>> > > > > >> >> wrote:
>> > > > > >> >>
>> > > > > >> >> > > There is a separate JobMaster for each job
>> > > > > >> >> > within a Flink cluster and each JobMaster only has a
>> partial
>> > > view
>> > > > > of
>> > > > > >> the
>> > > > > >> >> > task managers
>> > > > > >> >> >
>> > > > > >> >> > Good point! I've had a deeper look and you're right. We
>> > > > definitely
>> > > > > >> need
>> > > > > >> >> to
>> > > > > >> >> > find another place.
>> > > > > >> >> >
>> > > > > >> >> > > Related per-cluster or per-job keytab:
>> > > > > >> >> >
>> > > > > >> >> > In the current code per-cluster keytab is implemented and
>> I'm
>> > > > > >> intended
>> > > > > >> >> to
>> > > > > >> >> > keep it like this within this FLIP. The reason is simple:
>> > > tokens
>> > > > on
>> > > > > >> TM
>> > > > > >> >> side
>> > > > > >> >> > can be stored within the UserGroupInformation (UGI)
>> structure
>> > > > which
>> > > > > >> is
>> > > > > >> >> > global. I'm not telling it's impossible to change that
>> but I
>> > > > think
>> > > > > >> that
>> > > > > >> >> > this is such a complexity which the initial
>> implementation is
>> > > not
>> > > > > >> >> required
>> > > > > >> >> > to contain. Additionally we've not seen such need from
>> user
>> > > side.
>> > > > > If
>> > > > > >> the
>> > > > > >> >> > need may rise later on then another FLIP with this topic
>> can
>> > be
>> > > > > >> created
>> > > > > >> >> and
>> > > > > >> >> > discussed. Proper multi-UGI handling within a single JVM
>> is a
>> > > > topic
>> > > > > >> >> where
>> > > > > >> >> > several round of deep-dive with the Hadoop/YARN guys are
>> > > > required.
>> > > > > >> >> >
>> > > > > >> >> > > single DTM instance embedded with
>> > > > > >> >> > the ResourceManager (the Flink component)
>> > > > > >> >> >
>> > > > > >> >> > Could you point to a code where you think it could be
>> added
>> > > > > exactly?
>> > > > > >> A
>> > > > > >> >> > helping hand is welcome here🙂
>> > > > > >> >> >
>> > > > > >> >> > > Then the single (initial) implementation should work
>> with
>> > all
>> > > > the
>> > > > > >> >> > deployments modes out of the box (which is not what the
>> FLIP
>> > > > > >> suggests).
>> > > > > >> >> Is
>> > > > > >> >> > that correct?
>> > > > > >> >> >
>> > > > > >> >> > All deployment modes (per-job, per-app, ...) are planned
>> to
>> > be
>> > > > > tested
>> > > > > >> >> and
>> > > > > >> >> > expect to work with the initial implementation however not
>> > all
>> > > > > >> >> deployment
>> > > > > >> >> > targets (k8s, local, ...) are not intended to be tested.
>> Per
>> > > > > >> deployment
>> > > > > >> >> > target new jira needs to be created where I expect small
>> > number
>> > > > of
>> > > > > >> codes
>> > > > > >> >> > needs to be added and relatively expensive testing effort
>> is
>> > > > > >> required.
>> > > > > >> >> >
>> > > > > >> >> > > I've taken a look into the prototype and in the
>> > > > > >> >> "YarnClusterDescriptor"
>> > > > > >> >> > you're injecting a delegation token into the AM [1]
>> (that's
>> > > > > obtained
>> > > > > >> >> using
>> > > > > >> >> > the provided keytab). If I understand this correctly from
>> > > > previous
>> > > > > >> >> > discussion / FLIP, this is to support log aggregation and
>> DT
>> > > has
>> > > > a
>> > > > > >> >> limited
>> > > > > >> >> > validity. How is this DT going to be renewed?
>> > > > > >> >> >
>> > > > > >> >> > You're clever and touched a limitation which Spark has
>> too.
>> > In
>> > > > > short,
>> > > > > >> >> after
>> > > > > >> >> > DT reaches it's max lifetime then log aggregation stops.
>> I've
>> > > had
>> > > > > >> >> several
>> > > > > >> >> > deep-dive rounds with the YARN guys at Spark years because
>> > > wanted
>> > > > > to
>> > > > > >> >> fill
>> > > > > >> >> > this gap. They can't provide us any way to re-inject the
>> > newly
>> > > > > >> obtained
>> > > > > >> >> DT
>> > > > > >> >> > so at the end I gave up this.
>> > > > > >> >> >
>> > > > > >> >> > BR,
>> > > > > >> >> > G
>> > > > > >> >> >
>> > > > > >> >> >
>> > > > > >> >> > On Mon, 24 Jan 2022, 11:00 David Morávek, <
>> d...@apache.org>
>> > > > wrote:
>> > > > > >> >> >
>> > > > > >> >> > > Hi Gabor,
>> > > > > >> >> > >
>> > > > > >> >> > > There is actually a huge difference between JobManager
>> > > > (process)
>> > > > > >> and
>> > > > > >> >> > > JobMaster (job coordinator). The naming is unfortunately
>> > bit
>> > > > > >> >> misleading
>> > > > > >> >> > > here from historical reasons. There is a separate
>> JobMaster
>> > > for
>> > > > > >> each
>> > > > > >> >> job
>> > > > > >> >> > > within a Flink cluster and each JobMaster only has a
>> > partial
>> > > > view
>> > > > > >> of
>> > > > > >> >> the
>> > > > > >> >> > > task managers (depends on where the slots for a
>> particular
>> > > job
>> > > > > are
>> > > > > >> >> > > allocated). This means that you'll end up with N
>> > > > > >> >> > "DelegationTokenManagers"
>> > > > > >> >> > > competing with each other (N = number of running jobs in
>> > the
>> > > > > >> cluster).
>> > > > > >> >> > >
>> > > > > >> >> > > This makes me think we're mixing two abstraction levels
>> > here:
>> > > > > >> >> > >
>> > > > > >> >> > > a) Per-cluster delegation tokens
>> > > > > >> >> > > - Simpler approach, it would involve a single DTM
>> instance
>> > > > > embedded
>> > > > > >> >> with
>> > > > > >> >> > > the ResourceManager (the Flink component)
>> > > > > >> >> > > b) Per-job delegation tokens
>> > > > > >> >> > > - More complex approach, but could be more flexible from
>> > the
>> > > > user
>> > > > > >> >> side of
>> > > > > >> >> > > things.
>> > > > > >> >> > > - Multiple DTM instances, that are bound with the
>> JobMaster
>> > > > > >> lifecycle.
>> > > > > >> >> > > Delegation tokens are attached with a particular slots
>> that
>> > > are
>> > > > > >> >> executing
>> > > > > >> >> > > the job tasks instead of the whole task manager (TM
>> could
>> > be
>> > > > > >> executing
>> > > > > >> >> > > multiple jobs with different tokens).
>> > > > > >> >> > > - The question is which keytab should be used for the
>> > > > clustering
>> > > > > >> >> > framework,
>> > > > > >> >> > > to support log aggregation on YARN (an extra keytab,
>> keytab
>> > > > that
>> > > > > >> comes
>> > > > > >> >> > with
>> > > > > >> >> > > the first job?)
>> > > > > >> >> > >
>> > > > > >> >> > > I think these are the things that need to be clarified
>> in
>> > the
>> > > > > FLIP
>> > > > > >> >> before
>> > > > > >> >> > > proceeding.
>> > > > > >> >> > >
>> > > > > >> >> > > A follow-up question for getting a better understanding
>> > where
>> > > > > this
>> > > > > >> >> should
>> > > > > >> >> > > be headed: Are there any use cases where user may want
>> to
>> > use
>> > > > > >> >> different
>> > > > > >> >> > > keytabs with each job, or are we fine with using a
>> > > cluster-wide
>> > > > > >> >> keytab?
>> > > > > >> >> > If
>> > > > > >> >> > > we go with per-cluster keytabs, is it OK that all jobs
>> > > > submitted
>> > > > > >> into
>> > > > > >> >> > this
>> > > > > >> >> > > cluster can access it (even the future ones)? Should
>> this
>> > be
>> > > a
>> > > > > >> >> security
>> > > > > >> >> > > concern?
>> > > > > >> >> > >
>> > > > > >> >> > > Presume you though I would implement a new class with
>> > > > JobManager
>> > > > > >> name.
>> > > > > >> >> > The
>> > > > > >> >> > > > plan is not that.
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> > > I've never suggested such thing.
>> > > > > >> >> > >
>> > > > > >> >> > >
>> > > > > >> >> > > > No. That said earlier DT handling is planned to be
>> done
>> > > > > >> completely
>> > > > > >> >> in
>> > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains
>> tokens
>> > in
>> > > > the
>> > > > > >> >> proper
>> > > > > >> >> > > time
>> > > > > >> >> > > > when needed.
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> > > Then the single (initial) implementation should work
>> with
>> > all
>> > > > the
>> > > > > >> >> > > deployments modes out of the box (which is not what the
>> > FLIP
>> > > > > >> >> suggests).
>> > > > > >> >> > Is
>> > > > > >> >> > > that correct?
>> > > > > >> >> > >
>> > > > > >> >> > > If the cluster framework, also requires delegation token
>> > for
>> > > > > their
>> > > > > >> >> inner
>> > > > > >> >> > > working (this is IMO only applies to YARN), it might
>> need
>> > an
>> > > > > extra
>> > > > > >> >> step
>> > > > > >> >> > > (injecting the token into application master container).
>> > > > > >> >> > >
>> > > > > >> >> > > Separating the individual layers (actual Flink cluster -
>> > > > > basically
>> > > > > >> >> making
>> > > > > >> >> > > this work with a standalone deployment  / "cluster
>> > > framework" -
>> > > > > >> >> support
>> > > > > >> >> > for
>> > > > > >> >> > > YARN log aggregation) in the FLIP would be useful.
>> > > > > >> >> > >
>> > > > > >> >> > > Reading the linked Spark readme could be useful.
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> > > I've read that, but please be patient with the
>> questions,
>> > > > > Kerberos
>> > > > > >> is
>> > > > > >> >> not
>> > > > > >> >> > > an easy topic to get into and I've had a very little
>> > contact
>> > > > with
>> > > > > >> it
>> > > > > >> >> in
>> > > > > >> >> > the
>> > > > > >> >> > > past.
>> > > > > >> >> > >
>> > > > > >> >> > >
>> > > > > >> >> > >
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> > > I've taken a look into the prototype and in the
>> > > > > >> >> "YarnClusterDescriptor"
>> > > > > >> >> > > you're injecting a delegation token into the AM [1]
>> (that's
>> > > > > >> obtained
>> > > > > >> >> > using
>> > > > > >> >> > > the provided keytab). If I understand this correctly
>> from
>> > > > > previous
>> > > > > >> >> > > discussion / FLIP, this is to support log aggregation
>> and
>> > DT
>> > > > has
>> > > > > a
>> > > > > >> >> > limited
>> > > > > >> >> > > validity. How is this DT going to be renewed?
>> > > > > >> >> > >
>> > > > > >> >> > > [1]
>> > > > > >> >> > >
>> > > > > >> >> > >
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261
>> > > > > >> >> > >
>> > > > > >> >> > > Best,
>> > > > > >> >> > > D.
>> > > > > >> >> > >
>> > > > > >> >> > > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi <
>> > > > > >> >> gabor.g.somo...@gmail.com
>> > > > > >> >> > >
>> > > > > >> >> > > wrote:
>> > > > > >> >> > >
>> > > > > >> >> > > > Here is the exact class, I'm from mobile so not had a
>> > look
>> > > at
>> > > > > the
>> > > > > >> >> exact
>> > > > > >> >> > > > class name:
>> > > > > >> >> > > >
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
>> > > > > >> >> > > > That keeps track of TMs where the tokens can be sent
>> to.
>> > > > > >> >> > > >
>> > > > > >> >> > > > > My feeling would be that we shouldn't really
>> introduce
>> > a
>> > > > new
>> > > > > >> >> > component
>> > > > > >> >> > > > with
>> > > > > >> >> > > > a custom lifecycle, but rather we should try to
>> > incorporate
>> > > > > this
>> > > > > >> >> into
>> > > > > >> >> > > > existing ones.
>> > > > > >> >> > > >
>> > > > > >> >> > > > Can you be more specific? Presume you though I would
>> > > > implement
>> > > > > a
>> > > > > >> new
>> > > > > >> >> > > class
>> > > > > >> >> > > > with JobManager name. The plan is not that.
>> > > > > >> >> > > >
>> > > > > >> >> > > > > If I understand this correctly, this means that we
>> then
>> > > > push
>> > > > > >> the
>> > > > > >> >> > token
>> > > > > >> >> > > > renewal logic to YARN.
>> > > > > >> >> > > >
>> > > > > >> >> > > > No. That said earlier DT handling is planned to be
>> done
>> > > > > >> completely
>> > > > > >> >> in
>> > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains
>> tokens
>> > in
>> > > > the
>> > > > > >> >> proper
>> > > > > >> >> > > time
>> > > > > >> >> > > > when needed. YARN log aggregation is a totally
>> different
>> > > > > feature,
>> > > > > >> >> where
>> > > > > >> >> > > > YARN does the renewal. Log aggregation was an example
>> why
>> > > the
>> > > > > >> code
>> > > > > >> >> > can't
>> > > > > >> >> > > be
>> > > > > >> >> > > > 100% reusable for all resource managers. Reading the
>> > linked
>> > > > > Spark
>> > > > > >> >> > readme
>> > > > > >> >> > > > could be useful.
>> > > > > >> >> > > >
>> > > > > >> >> > > > G
>> > > > > >> >> > > >
>> > > > > >> >> > > > On Fri, 21 Jan 2022, 21:05 David Morávek, <
>> > d...@apache.org
>> > > >
>> > > > > >> wrote:
>> > > > > >> >> > > >
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > JobManager is the Flink class.
>> > > > > >> >> > > > >
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > There is no such class in Flink. The closest thing
>> to
>> > the
>> > > > > >> >> JobManager
>> > > > > >> >> > > is a
>> > > > > >> >> > > > > ClusterEntrypoint. The cluster entrypoint spawns
>> new RM
>> > > > > Runner
>> > > > > >> &
>> > > > > >> >> > > > Dispatcher
>> > > > > >> >> > > > > Runner that start participating in the leader
>> election.
>> > > > Once
>> > > > > >> they
>> > > > > >> >> > gain
>> > > > > >> >> > > > > leadership they spawn the actual underlying
>> instances
>> > of
>> > > > > these
>> > > > > >> two
>> > > > > >> >> > > "main
>> > > > > >> >> > > > > components".
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > My feeling would be that we shouldn't really
>> introduce
>> > a
>> > > > new
>> > > > > >> >> > component
>> > > > > >> >> > > > with
>> > > > > >> >> > > > > a custom lifecycle, but rather we should try to
>> > > incorporate
>> > > > > >> this
>> > > > > >> >> into
>> > > > > >> >> > > > > existing ones.
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > My biggest concerns would be:
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > - How would the lifecycle of the new component look
>> > like
>> > > > with
>> > > > > >> >> regards
>> > > > > >> >> > > to
>> > > > > >> >> > > > HA
>> > > > > >> >> > > > > setups. If we really try to decide to introduce a
>> > > > completely
>> > > > > >> new
>> > > > > >> >> > > > component,
>> > > > > >> >> > > > > how should this work in case of multiple JobManager
>> > > > > instances?
>> > > > > >> >> > > > > - Which components does it talk to / how? For
>> example
>> > how
>> > > > > does
>> > > > > >> the
>> > > > > >> >> > > > > broadcast of new token to task managers
>> > > > (TaskManagerGateway)
>> > > > > >> look
>> > > > > >> >> > like?
>> > > > > >> >> > > > Do
>> > > > > >> >> > > > > we simply introduce a new RPC on the
>> > > ResourceManagerGateway
>> > > > > >> that
>> > > > > >> >> > > > broadcasts
>> > > > > >> >> > > > > it or does the new component need to do some kind of
>> > > > > >> bookkeeping
>> > > > > >> >> of
>> > > > > >> >> > > task
>> > > > > >> >> > > > > managers that it needs to notify?
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > YARN based HDFS log aggregation would not work by
>> > > dropping
>> > > > > that
>> > > > > >> >> code.
>> > > > > >> >> > > > Just
>> > > > > >> >> > > > > > to be crystal clear, the actual implementation
>> > contains
>> > > > > this
>> > > > > >> fir
>> > > > > >> >> > > > exactly
>> > > > > >> >> > > > > > this reason.
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > This is the missing part +1. If I understand this
>> > > > correctly,
>> > > > > >> this
>> > > > > >> >> > means
>> > > > > >> >> > > > > that we then push the token renewal logic to YARN.
>> How
>> > do
>> > > > you
>> > > > > >> >> plan to
>> > > > > >> >> > > > > implement the renewal logic on k8s?
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > D.
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi <
>> > > > > >> >> > > gabor.g.somo...@gmail.com
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > wrote:
>> > > > > >> >> > > > >
>> > > > > >> >> > > > > > > I think we might both mean something different
>> by
>> > the
>> > > > RM.
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > You feel it well, I've not specified these terms
>> well
>> > > in
>> > > > > the
>> > > > > >> >> > > > explanation.
>> > > > > >> >> > > > > > RM I meant resource management framework.
>> JobManager
>> > is
>> > > > the
>> > > > > >> >> Flink
>> > > > > >> >> > > > class.
>> > > > > >> >> > > > > > This means that inside JM instance there will be a
>> > DTM
>> > > > > >> >> instance, so
>> > > > > >> >> > > > they
>> > > > > >> >> > > > > > would have the same lifecycle. Hope I've answered
>> the
>> > > > > >> question.
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > > If we have tokens available on the client side,
>> why
>> > > do
>> > > > we
>> > > > > >> >> need to
>> > > > > >> >> > > set
>> > > > > >> >> > > > > > them
>> > > > > >> >> > > > > > into the AM (yarn specific concept) launch
>> context?
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > YARN based HDFS log aggregation would not work by
>> > > > dropping
>> > > > > >> that
>> > > > > >> >> > code.
>> > > > > >> >> > > > > Just
>> > > > > >> >> > > > > > to be crystal clear, the actual implementation
>> > contains
>> > > > > this
>> > > > > >> fir
>> > > > > >> >> > > > exactly
>> > > > > >> >> > > > > > this reason.
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > G
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, <
>> > > > d...@apache.org
>> > > > > >
>> > > > > >> >> wrote:
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > > Hi Gabor,
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > 1. One thing is important, token management is
>> > > planned
>> > > > to
>> > > > > >> be
>> > > > > >> >> done
>> > > > > >> >> > > > > > > > generically within Flink and not scattered in
>> RM
>> > > > > specific
>> > > > > >> >> code.
>> > > > > >> >> > > > > > > JobManager
>> > > > > >> >> > > > > > > > has a DelegationTokenManager which obtains
>> tokens
>> > > > > >> >> time-to-time
>> > > > > >> >> > > (if
>> > > > > >> >> > > > > > > > configured properly). JM knows which
>> TaskManagers
>> > > are
>> > > > > in
>> > > > > >> >> place
>> > > > > >> >> > so
>> > > > > >> >> > > > it
>> > > > > >> >> > > > > > can
>> > > > > >> >> > > > > > > > distribute it to all TMs. That's it basically.
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > I think we might both mean something different
>> by
>> > the
>> > > > RM.
>> > > > > >> >> > > JobManager
>> > > > > >> >> > > > is
>> > > > > >> >> > > > > > > basically just a process encapsulating multiple
>> > > > > components,
>> > > > > >> >> one
>> > > > > >> >> > of
>> > > > > >> >> > > > > which
>> > > > > >> >> > > > > > is
>> > > > > >> >> > > > > > > a ResourceManager, which is the component that
>> > > manages
>> > > > > task
>> > > > > >> >> > manager
>> > > > > >> >> > > > > > > registrations [1]. There is more or less a
>> single
>> > > > > >> >> implementation
>> > > > > >> >> > of
>> > > > > >> >> > > > the
>> > > > > >> >> > > > > > RM
>> > > > > >> >> > > > > > > with plugable drivers for the active
>> integrations
>> > > > (yarn,
>> > > > > >> k8s).
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > It would be great if you could share more
>> details
>> > of
>> > > > how
>> > > > > >> >> exactly
>> > > > > >> >> > > the
>> > > > > >> >> > > > > DTM
>> > > > > >> >> > > > > > is
>> > > > > >> >> > > > > > > going to fit in the current JM architecture.
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > 2. 99.9% of the code is generic but each RM
>> handles
>> > > > > tokens
>> > > > > >> >> > > > > differently. A
>> > > > > >> >> > > > > > > > good example is YARN obtains tokens on client
>> > side
>> > > > and
>> > > > > >> then
>> > > > > >> >> > sets
>> > > > > >> >> > > > them
>> > > > > >> >> > > > > > on
>> > > > > >> >> > > > > > > > the newly created AM container launch context.
>> > This
>> > > > is
>> > > > > >> >> purely
>> > > > > >> >> > > YARN
>> > > > > >> >> > > > > > > specific
>> > > > > >> >> > > > > > > > and cant't be spared. With my actual plans
>> > > standalone
>> > > > > >> can be
>> > > > > >> >> > > > changed
>> > > > > >> >> > > > > to
>> > > > > >> >> > > > > > > use
>> > > > > >> >> > > > > > > > the framework. By using it I mean no RM
>> specific
>> > > DTM
>> > > > or
>> > > > > >> >> > > whatsoever
>> > > > > >> >> > > > is
>> > > > > >> >> > > > > > > > needed.
>> > > > > >> >> > > > > > > >
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > If we have tokens available on the client side,
>> why
>> > > do
>> > > > we
>> > > > > >> >> need to
>> > > > > >> >> > > set
>> > > > > >> >> > > > > > them
>> > > > > >> >> > > > > > > into the AM (yarn specific concept) launch
>> context?
>> > > Why
>> > > > > >> can't
>> > > > > >> >> we
>> > > > > >> >> > > > simply
>> > > > > >> >> > > > > > > send them to the JM, eg. as a parameter of the
>> job
>> > > > > >> submission
>> > > > > >> >> /
>> > > > > >> >> > via
>> > > > > >> >> > > > > > > separate RPC call? There might be something I'm
>> > > missing
>> > > > > >> due to
>> > > > > >> >> > > > limited
>> > > > > >> >> > > > > > > knowledge, but handling the token on the
>> "cluster
>> > > > > >> framework"
>> > > > > >> >> > level
>> > > > > >> >> > > > > > doesn't
>> > > > > >> >> > > > > > > seem necessary.
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > [1]
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > >
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > Best,
>> > > > > >> >> > > > > > > D.
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor Somogyi <
>> > > > > >> >> > > > > gabor.g.somo...@gmail.com
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > wrote:
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > > > > Oh and one more thing. I'm planning to add
>> this
>> > > > feature
>> > > > > >> in
>> > > > > >> >> > small
>> > > > > >> >> > > > > chunk
>> > > > > >> >> > > > > > of
>> > > > > >> >> > > > > > > > PRs because security is super hairy area. That
>> > way
>> > > > > >> reviewers
>> > > > > >> >> > can
>> > > > > >> >> > > be
>> > > > > >> >> > > > > > more
>> > > > > >> >> > > > > > > > easily obtains the concept.
>> > > > > >> >> > > > > > > >
>> > > > > >> >> > > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, <
>> > > > > >> d...@apache.org>
>> > > > > >> >> > > wrote:
>> > > > > >> >> > > > > > > >
>> > > > > >> >> > > > > > > > > Hi Gabor,
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > thanks for drafting the FLIP, I think
>> having a
>> > > > solid
>> > > > > >> >> Kerberos
>> > > > > >> >> > > > > support
>> > > > > >> >> > > > > > > is
>> > > > > >> >> > > > > > > > > crucial for many enterprise deployments.
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > I have multiple questions regarding the
>> > > > > implementation
>> > > > > >> >> (note
>> > > > > >> >> > > > that I
>> > > > > >> >> > > > > > > have
>> > > > > >> >> > > > > > > > > very limited knowledge of Kerberos):
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > 1) If I understand it correctly, we'll only
>> > > obtain
>> > > > > >> tokens
>> > > > > >> >> in
>> > > > > >> >> > > the
>> > > > > >> >> > > > > job
>> > > > > >> >> > > > > > > > > manager and then we'll distribute them via
>> RPC
>> > > > (needs
>> > > > > >> to
>> > > > > >> >> be
>> > > > > >> >> > > > > secured).
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > Can you please outline how the communication
>> > will
>> > > > > look
>> > > > > >> >> like?
>> > > > > >> >> > Is
>> > > > > >> >> > > > the
>> > > > > >> >> > > > > > > > > DelegationTokenManager going to be a part of
>> > the
>> > > > > >> >> > > ResourceManager?
>> > > > > >> >> > > > > Can
>> > > > > >> >> > > > > > > you
>> > > > > >> >> > > > > > > > > outline it's lifecycle / how it's going to
>> be
>> > > > > >> integrated
>> > > > > >> >> > there?
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > 2) Do we really need a YARN / k8s specific
>> > > > > >> >> implementations?
>> > > > > >> >> > Is
>> > > > > >> >> > > it
>> > > > > >> >> > > > > > > > possible
>> > > > > >> >> > > > > > > > > to obtain / renew a token in a generic way?
>> > Maybe
>> > > > to
>> > > > > >> >> rephrase
>> > > > > >> >> > > > that,
>> > > > > >> >> > > > > > is
>> > > > > >> >> > > > > > > it
>> > > > > >> >> > > > > > > > > possible to implement DelegationTokenManager
>> > for
>> > > > the
>> > > > > >> >> > standalone
>> > > > > >> >> > > > > > Flink?
>> > > > > >> >> > > > > > > If
>> > > > > >> >> > > > > > > > > we're able to solve this point, it could be
>> > > > possible
>> > > > > to
>> > > > > >> >> > target
>> > > > > >> >> > > > all
>> > > > > >> >> > > > > > > > > deployment scenarios with a single
>> > > implementation.
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > Best,
>> > > > > >> >> > > > > > > > > D.
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan
>> Zhang <
>> > > > > >> >> > > > > > zuston.sha...@gmail.com>
>> > > > > >> >> > > > > > > > > wrote:
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > > > > Hi G
>> > > > > >> >> > > > > > > > > >
>> > > > > >> >> > > > > > > > > > Thanks for your explain in detail. I have
>> > > gotten
>> > > > > your
>> > > > > >> >> > > thoughts,
>> > > > > >> >> > > > > and
>> > > > > >> >> > > > > > > any
>> > > > > >> >> > > > > > > > > > way this proposal
>> > > > > >> >> > > > > > > > > > is a great improvement.
>> > > > > >> >> > > > > > > > > >
>> > > > > >> >> > > > > > > > > > Looking forward to your implementation
>> and i
>> > > will
>> > > > > >> keep
>> > > > > >> >> > focus
>> > > > > >> >> > > on
>> > > > > >> >> > > > > it.
>> > > > > >> >> > > > > > > > > > Thanks again.
>> > > > > >> >> > > > > > > > > >
>> > > > > >> >> > > > > > > > > > Best
>> > > > > >> >> > > > > > > > > > JunFan.
>> > > > > >> >> > > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor
>> > Somogyi <
>> > > > > >> >> > > > > > > > gabor.g.somo...@gmail.com
>> > > > > >> >> > > > > > > > > >,
>> > > > > >> >> > > > > > > > > > wrote:
>> > > > > >> >> > > > > > > > > > > Just to confirm keeping
>> > > > > >> >> > > > > > "security.kerberos.fetch.delegation-token"
>> > > > > >> >> > > > > > > is
>> > > > > >> >> > > > > > > > > > added
>> > > > > >> >> > > > > > > > > > > to the doc.
>> > > > > >> >> > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > BR,
>> > > > > >> >> > > > > > > > > > > G
>> > > > > >> >> > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor
>> > > Somogyi <
>> > > > > >> >> > > > > > > > > gabor.g.somo...@gmail.com
>> > > > > >> >> > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > wrote:
>> > > > > >> >> > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > Hi JunFan,
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > By the way, maybe this should be
>> added
>> > in
>> > > > the
>> > > > > >> >> > migration
>> > > > > >> >> > > > > plan
>> > > > > >> >> > > > > > or
>> > > > > >> >> > > > > > > > > > > > intergation section in the FLIP-211.
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > Going to add this soon.
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > Besides, I have a question that the
>> KDC
>> > > > will
>> > > > > >> >> collapse
>> > > > > >> >> > > > when
>> > > > > >> >> > > > > > the
>> > > > > >> >> > > > > > > > > > cluster
>> > > > > >> >> > > > > > > > > > > > reached 200 nodes you described
>> > > > > >> >> > > > > > > > > > > > in the google doc. Do you have any
>> > > attachment
>> > > > > or
>> > > > > >> >> > > reference
>> > > > > >> >> > > > to
>> > > > > >> >> > > > > > > prove
>> > > > > >> >> > > > > > > > > it?
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > "KDC *may* collapse under some
>> > > circumstances"
>> > > > > is
>> > > > > >> the
>> > > > > >> >> > > proper
>> > > > > >> >> > > > > > > > wording.
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > We have several customers who are
>> > executing
>> > > > > >> >> workloads
>> > > > > >> >> > on
>> > > > > >> >> > > > > > > > Spark/Flink.
>> > > > > >> >> > > > > > > > > > Most
>> > > > > >> >> > > > > > > > > > > > of the time I'm facing their
>> > > > > >> >> > > > > > > > > > > > daily issues which is heavily
>> environment
>> > > and
>> > > > > >> >> use-case
>> > > > > >> >> > > > > > dependent.
>> > > > > >> >> > > > > > > > > I've
>> > > > > >> >> > > > > > > > > > > > seen various cases:
>> > > > > >> >> > > > > > > > > > > > * where the mentioned ~1k nodes were
>> > > working
>> > > > > fine
>> > > > > >> >> > > > > > > > > > > > * where KDC thought the number of
>> > requests
>> > > > are
>> > > > > >> >> coming
>> > > > > >> >> > > from
>> > > > > >> >> > > > > DDOS
>> > > > > >> >> > > > > > > > > attack
>> > > > > >> >> > > > > > > > > > so
>> > > > > >> >> > > > > > > > > > > > discontinued authentication
>> > > > > >> >> > > > > > > > > > > > * where KDC was simply not responding
>> > > because
>> > > > > of
>> > > > > >> the
>> > > > > >> >> > load
>> > > > > >> >> > > > > > > > > > > > * where KDC was intermittently had
>> some
>> > > > outage
>> > > > > >> (this
>> > > > > >> >> > was
>> > > > > >> >> > > > the
>> > > > > >> >> > > > > > most
>> > > > > >> >> > > > > > > > > nasty
>> > > > > >> >> > > > > > > > > > > > thing)
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > Since you're managing relatively big
>> > > cluster
>> > > > > then
>> > > > > >> >> you
>> > > > > >> >> > > know
>> > > > > >> >> > > > > that
>> > > > > >> >> > > > > > > KDC
>> > > > > >> >> > > > > > > > > is
>> > > > > >> >> > > > > > > > > > not
>> > > > > >> >> > > > > > > > > > > > only used by Spark/Flink workloads
>> > > > > >> >> > > > > > > > > > > > but the whole company IT
>> infrastructure
>> > is
>> > > > > >> bombing
>> > > > > >> >> it
>> > > > > >> >> > so
>> > > > > >> >> > > it
>> > > > > >> >> > > > > > > really
>> > > > > >> >> > > > > > > > > > depends
>> > > > > >> >> > > > > > > > > > > > on other factors too whether KDC is
>> > > reaching
>> > > > > >> >> > > > > > > > > > > > it's limit or not. Not sure what kind
>> of
>> > > > > evidence
>> > > > > >> >> are
>> > > > > >> >> > you
>> > > > > >> >> > > > > > looking
>> > > > > >> >> > > > > > > > for
>> > > > > >> >> > > > > > > > > > but
>> > > > > >> >> > > > > > > > > > > > I'm not authorized to share any
>> > information
>> > > > > about
>> > > > > >> >> > > > > > > > > > > > our clients data.
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > One thing is for sure. The more
>> external
>> > > > system
>> > > > > >> >> types
>> > > > > >> >> > are
>> > > > > >> >> > > > > used
>> > > > > >> >> > > > > > in
>> > > > > >> >> > > > > > > > > > > > workloads (for ex. HDFS, HBase, Hive,
>> > > Kafka)
>> > > > > >> which
>> > > > > >> >> > > > > > > > > > > > are authenticating through KDC the
>> more
>> > > > > >> possibility
>> > > > > >> >> to
>> > > > > >> >> > > > reach
>> > > > > >> >> > > > > > this
>> > > > > >> >> > > > > > > > > > > > threshold when the cluster is big
>> enough.
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > All in all this feature is here to
>> help
>> > all
>> > > > > users
>> > > > > >> >> never
>> > > > > >> >> > > > reach
>> > > > > >> >> > > > > > > this
>> > > > > >> >> > > > > > > > > > > > limitation.
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > BR,
>> > > > > >> >> > > > > > > > > > > > G
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆 <
>> > > > > >> >> > > > zuston.sha...@gmail.com
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > > > > > wrote:
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > Hi G
>> > > > > >> >> > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > Thanks for your quick reply. I think
>> > > > > reserving
>> > > > > >> the
>> > > > > >> >> > > config
>> > > > > >> >> > > > > of
>> > > > > >> >> > > > > > > > > > > > >
>> > > *security.kerberos.fetch.delegation-token*
>> > > > > >> >> > > > > > > > > > > > > and simplifying disable the token
>> > > fetching
>> > > > > is a
>> > > > > >> >> good
>> > > > > >> >> > > > > idea.By
>> > > > > >> >> > > > > > > the
>> > > > > >> >> > > > > > > > > way,
>> > > > > >> >> > > > > > > > > > > > > maybe this should be added
>> > > > > >> >> > > > > > > > > > > > > in the migration plan or intergation
>> > > > section
>> > > > > in
>> > > > > >> >> the
>> > > > > >> >> > > > > FLIP-211.
>> > > > > >> >> > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > Besides, I have a question that the
>> KDC
>> > > > will
>> > > > > >> >> collapse
>> > > > > >> >> > > > when
>> > > > > >> >> > > > > > the
>> > > > > >> >> > > > > > > > > > cluster
>> > > > > >> >> > > > > > > > > > > > > reached 200 nodes you described
>> > > > > >> >> > > > > > > > > > > > > in the google doc. Do you have any
>> > > > attachment
>> > > > > >> or
>> > > > > >> >> > > > reference
>> > > > > >> >> > > > > to
>> > > > > >> >> > > > > > > > prove
>> > > > > >> >> > > > > > > > > > it?
>> > > > > >> >> > > > > > > > > > > > > Because in our internal per-cluster,
>> > > > > >> >> > > > > > > > > > > > > the nodes reaches > 1000 and KDC
>> looks
>> > > > good.
>> > > > > >> Do i
>> > > > > >> >> > > missed
>> > > > > >> >> > > > or
>> > > > > >> >> > > > > > > > > > misunderstood
>> > > > > >> >> > > > > > > > > > > > > something? Please correct me.
>> > > > > >> >> > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > Best
>> > > > > >> >> > > > > > > > > > > > > JunFan.
>> > > > > >> >> > > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800,
>> > > > > >> >> dev@flink.apache.org
>> > > > > >> >> > ,
>> > > > > >> >> > > > > wrote:
>> > > > > >> >> > > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > >
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > >
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > >
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
>> > > > > >> >> > > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > > > >
>> > > > > >> >> > > > > > > > > >
>> > > > > >> >> > > > > > > > >
>> > > > > >> >> > > > > > > >
>> > > > > >> >> > > > > > >
>> > > > > >> >> > > > > >
>> > > > > >> >> > > > >
>> > > > > >> >> > > >
>> > > > > >> >> > >
>> > > > > >> >> >
>> > > > > >> >>
>> > > > > >> >
>> > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to