Not sure if the mentioned write right already given or not but I still don't see any edit button.
G On Fri, Jan 28, 2022 at 5:08 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Robert, > > That would be awesome. > > My cwiki username: gaborgsomogyi > > G > > > On Fri, Jan 28, 2022 at 5:06 PM Robert Metzger <metrob...@gmail.com> > wrote: > >> Hey Gabor, >> >> let me know your cwiki username, and I can give you write permissions. >> >> >> On Fri, Jan 28, 2022 at 4:05 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >> > Thanks for making the design better! No further thing to discuss from my >> > side. >> > >> > Started to reflect the agreement in the FLIP doc. >> > Since I don't have access to the wiki I need to ask Marci to do that >> which >> > may take some time. >> > >> > G >> > >> > >> > On Fri, Jan 28, 2022 at 3:52 PM David Morávek <d...@apache.org> wrote: >> > >> > > Hi, >> > > >> > > AFAIU an under registration TM is not added to the registered TMs map >> > until >> > > > RegistrationResponse .. >> > > > >> > > >> > > I think you're right, with a careful design around threading >> (delegating >> > > update broadcasts to the main thread) + synchronous initial update >> (that >> > > would be nice to avoid) this should be doable. >> > > >> > > Not sure what you mean "we can't register the TM without providing it >> > with >> > > > token" but in unsecure configuration registration must happen w/o >> > tokens. >> > > > >> > > >> > > Exactly as you describe it, this was meant only for the "kerberized / >> > > secured" cluster case, in other cases we wouldn't enforce a non-null >> > token >> > > in the response >> > > >> > > I think this is a good idea in general. >> > > > >> > > >> > > +1 >> > > >> > > If you don't have any more thoughts on the RPC / lifecycle part, can >> you >> > > please reflect it into the FLIP? >> > > >> > > D. >> > > >> > > On Fri, Jan 28, 2022 at 3:16 PM Gabor Somogyi < >> gabor.g.somo...@gmail.com >> > > >> > > wrote: >> > > >> > > > > - Make sure DTs issued by single DTMs are monotonically increasing >> > (can >> > > > be >> > > > sorted on TM side) >> > > > >> > > > AFAIU an under registration TM is not added to the registered TMs >> map >> > > until >> > > > RegistrationResponse >> > > > is processed which would contain the initial tokens. If that's true >> > then >> > > > how is it possible to have race with >> > > > DTM update which is working on the registered TMs list? >> > > > To be more specific "taskExecutors" is the registered map of TMs to >> > which >> > > > DTM can send updated tokens >> > > > but this doesn't contain the under registration TM while >> > > > RegistrationResponse is not processed, right? >> > > > >> > > > Of course if DTM can update while RegistrationResponse is processed >> > then >> > > > somehow sorting would be >> > > > required and that case I would agree. >> > > > >> > > > - Scope DT updates by the RM ID and ensure that TM only accepts >> update >> > > from >> > > > the current leader >> > > > >> > > > I've planned this initially the mentioned way so agreed. >> > > > >> > > > - Return initial token with the RegistrationResponse, which should >> make >> > > the >> > > > RPC contract bit clearer (ensure that we can't register the TM >> without >> > > > providing it with token) >> > > > >> > > > I think this is a good idea in general. Not sure what you mean "we >> > can't >> > > > register the TM without >> > > > providing it with token" but in unsecure configuration registration >> > must >> > > > happen w/o tokens. >> > > > All in all the newly added tokens field must be somehow optional. >> > > > >> > > > G >> > > > >> > > > >> > > > On Fri, Jan 28, 2022 at 2:22 PM David Morávek <d...@apache.org> >> wrote: >> > > > >> > > > > We had a long discussion with Chesnay about the possible edge >> cases >> > and >> > > > it >> > > > > basically boils down to the following two scenarios: >> > > > > >> > > > > 1) There is a possible race condition between TM registration (the >> > > first >> > > > DT >> > > > > update) and token refresh if they happen simultaneously. Than the >> > > > > registration might beat the refreshed token. This could be easily >> > > > addressed >> > > > > if DTs could be sorted (eg. by the expiration time) on the TM >> side. >> > In >> > > > > other words, if there are multiple updates at the same time we >> need >> > to >> > > > make >> > > > > sure that we have a deterministic way of choosing the latest one. >> > > > > >> > > > > One idea by Chesnay that popped up during this discussion was >> whether >> > > we >> > > > > could simply return the initial token with the >> RegistrationResponse >> > to >> > > > > avoid making an extra call during the TM registration. >> > > > > >> > > > > 2) When the RM leadership changes (eg. because zookeeper session >> > times >> > > > out) >> > > > > there might be a race condition where the old RM is shutting down >> and >> > > > > updates the tokens, that it might again beat the registration >> token >> > of >> > > > the >> > > > > new RM. This could be avoided if we scope the token by >> > > > _ResourceManagerId_ >> > > > > and only accept updates for the current leader (basically we'd >> have >> > an >> > > > > extra parameter to the _updateDelegationToken_ method). >> > > > > >> > > > > - >> > > > > >> > > > > DTM is way simpler then for example slot management, which could >> > > receive >> > > > > updates from the JobMaster that RM might not know about. >> > > > > >> > > > > So if you want to go in the path you're describing it should be >> > doable >> > > > and >> > > > > we'd propose following to cover all cases: >> > > > > >> > > > > - Make sure DTs issued by single DTMs are monotonically increasing >> > (can >> > > > be >> > > > > sorted on TM side) >> > > > > - Scope DT updates by the RM ID and ensure that TM only accepts >> > update >> > > > from >> > > > > the current leader >> > > > > - Return initial token with the RegistrationResponse, which should >> > make >> > > > the >> > > > > RPC contract bit clearer (ensure that we can't register the TM >> > without >> > > > > providing it with token) >> > > > > >> > > > > Any thoughts? >> > > > > >> > > > > >> > > > > On Fri, Jan 28, 2022 at 10:53 AM Gabor Somogyi < >> > > > gabor.g.somo...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Thanks for investing your time! >> > > > > > >> > > > > > The first 2 bulletpoint are clear. >> > > > > > If there is a chance that a TM can go to an inconsistent state >> > then I >> > > > > agree >> > > > > > with the 3rd bulletpoint. >> > > > > > Just before we agree on that I would like to learn something new >> > and >> > > > > > understand how is it possible that a TM >> > > > > > gets corrupted? (In Spark I've never seen such thing and no >> > mechanism >> > > > to >> > > > > > fix this but Flink is definitely not Spark) >> > > > > > >> > > > > > Here is my understanding: >> > > > > > * DTM pushes new obtained DTs to TMs and if any exception occurs >> > > then a >> > > > > > retry after "security.kerberos.tokens.retry-wait" >> > > > > > happens. This means DTM retries until it's not possible to send >> new >> > > DTs >> > > > > to >> > > > > > all registered TMs. >> > > > > > * New TM registration must fail if "updateDelegationToken" fails >> > > > > > * "updateDelegationToken" fails consistently like a DB (at >> least I >> > > plan >> > > > > to >> > > > > > implement it that way). >> > > > > > If DTs are arriving on the TM side then a single >> > > > > > "UserGroupInformation.getCurrentUser.addCredentials" >> > > > > > will be called which I've never seen it failed. >> > > > > > * I hope all other code parts are not touching existing DTs >> within >> > > the >> > > > > JVM >> > > > > > >> > > > > > I would like to emphasize I'm not against to add it just want to >> > see >> > > > what >> > > > > > kind of problems are we facing. >> > > > > > It would ease to catch bugs earlier and help in the maintenance. >> > > > > > >> > > > > > All in all I would buy the idea to add the 3rd bullet if we >> foresee >> > > the >> > > > > > need. >> > > > > > >> > > > > > G >> > > > > > >> > > > > > >> > > > > > On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org >> > >> > > > wrote: >> > > > > > >> > > > > > > Hi Gabor, >> > > > > > > >> > > > > > > This is definitely headed in a right direction +1. >> > > > > > > >> > > > > > > I think we still need to have a safeguard in case some of the >> TMs >> > > > gets >> > > > > > into >> > > > > > > the inconsistent state though, which will also eliminate the >> need >> > > for >> > > > > > > implementing a custom retry mechanism (when >> > _updateDelegationToken_ >> > > > > call >> > > > > > > fails for some reason). >> > > > > > > >> > > > > > > We already have this safeguard in place for slot pool (in case >> > > there >> > > > > are >> > > > > > > some slots in inconsistent state - eg. we haven't freed them >> for >> > > some >> > > > > > > reason) and for the partition tracker, which could be simply >> > > > enhanced. >> > > > > > This >> > > > > > > is done via periodic heartbeat from TaskManagers to the >> > > > ResourceManager >> > > > > > > that contains report about state of these two components >> (from TM >> > > > > > > perspective) so the RM can reconcile their state if necessary. >> > > > > > > >> > > > > > > I don't think adding an additional field to >> > > > > > _TaskExecutorHeartbeatPayload_ >> > > > > > > should be a concern as we only heartbeat every ~ 10s by >> default >> > and >> > > > the >> > > > > > new >> > > > > > > field would be small compared to rest of the existing payload. >> > Also >> > > > > > > heartbeat doesn't need to contain the whole DT, but just some >> > > > > identifier >> > > > > > > which signals whether it uses the right one, that could be >> > > > > significantly >> > > > > > > smaller. >> > > > > > > >> > > > > > > This is still a PUSH based approach as the RM would again call >> > the >> > > > > newly >> > > > > > > introduced _updateDelegationToken_ when it encounters >> > inconsistency >> > > > > (eg. >> > > > > > > due to a temporary network partition / a race condition we >> didn't >> > > > test >> > > > > > for >> > > > > > > / some other scenario we didn't think about). In practice >> these >> > > > > > > inconsistencies are super hard to avoid and reason about (and >> > > > > > unfortunately >> > > > > > > yes, we see them happen from time to time), so reusing the >> > existing >> > > > > > > mechanism that is designed for this exact problem simplify >> > things. >> > > > > > > >> > > > > > > To sum this up we'd have three code paths for calling >> > > > > > > _updateDelegationToken_: >> > > > > > > 1) When the TM registers, we push the token (if DTM already >> has >> > it) >> > > > to >> > > > > it >> > > > > > > 2) When DTM obtains a new token it broadcasts it to all >> currently >> > > > > > connected >> > > > > > > TMs >> > > > > > > 3) When a TM gets out of sync, DTM would reconcile it's state >> > > > > > > >> > > > > > > WDYT? >> > > > > > > >> > > > > > > Best, >> > > > > > > D. >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Jan 26, 2022 at 9:03 PM David Morávek < >> d...@apache.org> >> > > > wrote: >> > > > > > > >> > > > > > > > Thanks the update, I'll go over it tomorrow. >> > > > > > > > >> > > > > > > > On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi < >> > > > > > gabor.g.somo...@gmail.com >> > > > > > > > >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > >> Hi All, >> > > > > > > >> >> > > > > > > >> Since it has turned out that DTM can't be added as member >> of >> > > > > JobMaster >> > > > > > > >> < >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > > > > > >> > >> > > > > > > >> I've >> > > > > > > >> came up with a better proposal. >> > > > > > > >> David, thanks for pinpointing this out, you've caught a >> bug in >> > > the >> > > > > > early >> > > > > > > >> phase! >> > > > > > > >> >> > > > > > > >> Namely ResourceManager >> > > > > > > >> < >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124 >> > > > > > > >> > >> > > > > > > >> is >> > > > > > > >> a single instance class where DTM can be added as member >> > > variable. >> > > > > > > >> It has a list of all already registered TMs and new TM >> > > > registration >> > > > > is >> > > > > > > >> also >> > > > > > > >> happening here. >> > > > > > > >> The following can be added from logic perspective to be >> more >> > > > > specific: >> > > > > > > >> * Create new DTM instance in ResourceManager >> > > > > > > >> < >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124 >> > > > > > > >> > >> > > > > > > >> and >> > > > > > > >> start it (re-occurring thread to obtain new tokens) >> > > > > > > >> * Add a new function named "updateDelegationTokens" to >> > > > > > > TaskExecutorGateway >> > > > > > > >> < >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54 >> > > > > > > >> > >> > > > > > > >> * Call "updateDelegationTokens" on all registered TMs to >> > > propagate >> > > > > new >> > > > > > > DTs >> > > > > > > >> * In case of new TM registration call >> "updateDelegationTokens" >> > > > > before >> > > > > > > >> registration succeeds to setup new TM properly >> > > > > > > >> >> > > > > > > >> This way: >> > > > > > > >> * only a single DTM would live within a cluster which is >> the >> > > > > expected >> > > > > > > >> behavior >> > > > > > > >> * DTM is going to be added to a central place where all >> > > deployment >> > > > > > > target >> > > > > > > >> can make use of it >> > > > > > > >> * DTs are going to be pushed to TMs which would generate >> less >> > > > > network >> > > > > > > >> traffic than pull based approach >> > > > > > > >> (please see my previous mail where I've described both >> > > approaches) >> > > > > > > >> * HA scenario is going to be consistent because such >> > > > > > > >> < >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069 >> > > > > > > >> > >> > > > > > > >> a solution can be added to "updateDelegationTokens" >> > > > > > > >> >> > > > > > > >> @David or all others plz share whether you agree on this or >> > you >> > > > have >> > > > > > > >> better >> > > > > > > >> idea/suggestion. >> > > > > > > >> >> > > > > > > >> BR, >> > > > > > > >> G >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi < >> > > > > > > gabor.g.somo...@gmail.com >> > > > > > > >> > >> > > > > > > >> wrote: >> > > > > > > >> >> > > > > > > >> > First of all thanks for investing your time and helping >> me >> > > out. >> > > > > As I >> > > > > > > see >> > > > > > > >> > you have pretty solid knowledge in the RPC area. >> > > > > > > >> > I would like to rely on your knowledge since I'm learning >> > this >> > > > > part. >> > > > > > > >> > >> > > > > > > >> > > - Do we need to introduce a new RPC method or can we >> for >> > > > example >> > > > > > > >> > piggyback >> > > > > > > >> > on heartbeats? >> > > > > > > >> > >> > > > > > > >> > I'm fine with either solution but one thing is important >> > > > > > conceptually. >> > > > > > > >> > There are fundamentally 2 ways how tokens can be updated: >> > > > > > > >> > - Push way: When there are new DTs then JM JVM pushes >> DTs to >> > > TM >> > > > > > JVMs. >> > > > > > > >> This >> > > > > > > >> > is the preferred one since tiny amount of control logic >> > > needed. >> > > > > > > >> > - Pull way: Each time a TM would like to poll JM whether >> > there >> > > > are >> > > > > > new >> > > > > > > >> > tokens and each TM wants to decide alone whether DTs >> needs >> > to >> > > be >> > > > > > > >> updated or >> > > > > > > >> > not. >> > > > > > > >> > As you've mentioned here some ID needs to be generated, >> it >> > > would >> > > > > > > >> generated >> > > > > > > >> > quite some additional network traffic which can be >> > definitely >> > > > > > avoided. >> > > > > > > >> > As a final thought in Spark we've had this way of DT >> > > propagation >> > > > > > logic >> > > > > > > >> and >> > > > > > > >> > we've had major issues with it. >> > > > > > > >> > >> > > > > > > >> > So all in all DTM needs to obtain new tokens and there >> must >> > a >> > > > way >> > > > > to >> > > > > > > >> send >> > > > > > > >> > this data to all TMs from JM. >> > > > > > > >> > >> > > > > > > >> > > - What delivery semantics are we looking for? (what if >> > we're >> > > > > only >> > > > > > > >> able to >> > > > > > > >> > update subset of TMs / what happens if we exhaust >> retries / >> > > > should >> > > > > > we >> > > > > > > >> even >> > > > > > > >> > have the retry mechanism whatsoever) - I have a feeling >> that >> > > > > somehow >> > > > > > > >> > leveraging the existing heartbeat mechanism could help to >> > > answer >> > > > > > these >> > > > > > > >> > questions >> > > > > > > >> > >> > > > > > > >> > Let's go through these questions one by one. >> > > > > > > >> > > What delivery semantics are we looking for? >> > > > > > > >> > >> > > > > > > >> > DTM must receive an exception when at least one TM was >> not >> > > able >> > > > to >> > > > > > get >> > > > > > > >> DTs. >> > > > > > > >> > >> > > > > > > >> > > what if we're only able to update subset of TMs? >> > > > > > > >> > >> > > > > > > >> > Such case DTM will reschedule token obtain after >> > > > > > > >> > "security.kerberos.tokens.retry-wait" time. >> > > > > > > >> > >> > > > > > > >> > > what happens if we exhaust retries? >> > > > > > > >> > >> > > > > > > >> > There is no number of retries. In default configuration >> > tokens >> > > > > needs >> > > > > > > to >> > > > > > > >> be >> > > > > > > >> > re-obtained after one day. >> > > > > > > >> > DTM tries to obtain new tokens after 1day * 0.75 >> > > > > > > >> > (security.kerberos.tokens.renewal-ratio) = 18 hours. >> > > > > > > >> > When fails it retries after >> > > > "security.kerberos.tokens.retry-wait" >> > > > > > > which >> > > > > > > >> is >> > > > > > > >> > 1 hour by default. >> > > > > > > >> > If it never succeeds then authentication error is going >> to >> > > > happen >> > > > > on >> > > > > > > the >> > > > > > > >> > TM side and the workload is >> > > > > > > >> > going to stop. >> > > > > > > >> > >> > > > > > > >> > > should we even have the retry mechanism whatsoever? >> > > > > > > >> > >> > > > > > > >> > Yes, because there are always temporary cluster issues. >> > > > > > > >> > >> > > > > > > >> > > What does it mean for the running application (how does >> > this >> > > > > look >> > > > > > > like >> > > > > > > >> > from >> > > > > > > >> > the user perspective)? As far as I remember the logs are >> > only >> > > > > > > collected >> > > > > > > >> > ("aggregated") after the container is stopped, is that >> > > correct? >> > > > > > > >> > >> > > > > > > >> > With default config it works like that but it can be >> forced >> > to >> > > > > > > aggregate >> > > > > > > >> > at specific intervals. >> > > > > > > >> > A useful feature is forcing YARN to aggregate logs while >> the >> > > job >> > > > > is >> > > > > > > >> still >> > > > > > > >> > running. >> > > > > > > >> > For long-running jobs such as streaming jobs, this is >> > > > invaluable. >> > > > > To >> > > > > > > do >> > > > > > > >> > this, >> > > > > > > >> > >> > > > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds >> > > > > > must >> > > > > > > >> be >> > > > > > > >> > set to a non-negative value. >> > > > > > > >> > When this is set, a timer will be set for the given >> > duration, >> > > > and >> > > > > > > >> whenever >> > > > > > > >> > that timer goes off, >> > > > > > > >> > log aggregation will run on new files. >> > > > > > > >> > >> > > > > > > >> > > I think >> > > > > > > >> > this topic should get its own section in the FLIP (having >> > some >> > > > > cross >> > > > > > > >> > reference to YARN ticket would be really useful, but I'm >> not >> > > > sure >> > > > > if >> > > > > > > >> there >> > > > > > > >> > are any). >> > > > > > > >> > >> > > > > > > >> > I think this is important knowledge but this FLIP is not >> > > > touching >> > > > > > the >> > > > > > > >> > already existing behavior. >> > > > > > > >> > DTs are set on the AM container which is renewed by YARN >> > until >> > > > > it's >> > > > > > > not >> > > > > > > >> > possible anymore. >> > > > > > > >> > Any kind of new code is not going to change this >> limitation. >> > > > BTW, >> > > > > > > there >> > > > > > > >> is >> > > > > > > >> > no jira for this. >> > > > > > > >> > If you think it worth to write this down then I think the >> > good >> > > > > place >> > > > > > > is >> > > > > > > >> > the official security doc >> > > > > > > >> > area as caveat. >> > > > > > > >> > >> > > > > > > >> > > If we split the FLIP into two parts / sections that >> I've >> > > > > > suggested, >> > > > > > > I >> > > > > > > >> > don't >> > > > > > > >> > really think that you need to explicitly test for each >> > > > deployment >> > > > > > > >> scenario >> > > > > > > >> > / cluster framework, because the DTM part is completely >> > > > > independent >> > > > > > of >> > > > > > > >> the >> > > > > > > >> > deployment target. Basically this is what I'm aiming for >> > with >> > > > > > "making >> > > > > > > it >> > > > > > > >> > work with the standalone" (as simple as starting a new >> java >> > > > > process) >> > > > > > > >> Flink >> > > > > > > >> > first (which is also how most people deploy streaming >> > > > application >> > > > > on >> > > > > > > k8s >> > > > > > > >> > and the direction we're pushing forward with the >> > auto-scaling >> > > / >> > > > > > > reactive >> > > > > > > >> > mode initiatives). >> > > > > > > >> > >> > > > > > > >> > I see your point and agree the main direction. k8s is the >> > > > > megatrend >> > > > > > > >> which >> > > > > > > >> > most of the peoples >> > > > > > > >> > will use sooner or later. Not 100% sure what kind of >> split >> > you >> > > > > > suggest >> > > > > > > >> but >> > > > > > > >> > in my view >> > > > > > > >> > the main target is to add this feature and I'm open to >> any >> > > > logical >> > > > > > > work >> > > > > > > >> > ordering. >> > > > > > > >> > Please share the specific details and we work it out... >> > > > > > > >> > >> > > > > > > >> > G >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > On Mon, Jan 24, 2022 at 3:04 PM David Morávek < >> > > d...@apache.org> >> > > > > > > wrote: >> > > > > > > >> > >> > > > > > > >> >> > >> > > > > > > >> >> > Could you point to a code where you think it could be >> > added >> > > > > > > exactly? >> > > > > > > >> A >> > > > > > > >> >> > helping hand is welcome here 🙂 >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> I think you can take a look at >> > > > _ResourceManagerPartitionTracker_ >> > > > > > [1] >> > > > > > > >> which >> > > > > > > >> >> seems to have somewhat similar properties to the DTM. >> > > > > > > >> >> >> > > > > > > >> >> One topic that needs to be addressed there is how the >> RPC >> > > with >> > > > > the >> > > > > > > >> >> _TaskExecutorGateway_ should look like. >> > > > > > > >> >> - Do we need to introduce a new RPC method or can we for >> > > > example >> > > > > > > >> piggyback >> > > > > > > >> >> on heartbeats? >> > > > > > > >> >> - What delivery semantics are we looking for? (what if >> > we're >> > > > only >> > > > > > > able >> > > > > > > >> to >> > > > > > > >> >> update subset of TMs / what happens if we exhaust >> retries / >> > > > > should >> > > > > > we >> > > > > > > >> even >> > > > > > > >> >> have the retry mechanism whatsoever) - I have a feeling >> > that >> > > > > > somehow >> > > > > > > >> >> leveraging the existing heartbeat mechanism could help >> to >> > > > answer >> > > > > > > these >> > > > > > > >> >> questions >> > > > > > > >> >> >> > > > > > > >> >> In short, after DT reaches it's max lifetime then log >> > > > aggregation >> > > > > > > stops >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> What does it mean for the running application (how does >> > this >> > > > look >> > > > > > > like >> > > > > > > >> >> from >> > > > > > > >> >> the user perspective)? As far as I remember the logs are >> > only >> > > > > > > collected >> > > > > > > >> >> ("aggregated") after the container is stopped, is that >> > > > correct? I >> > > > > > > think >> > > > > > > >> >> this topic should get its own section in the FLIP >> (having >> > > some >> > > > > > cross >> > > > > > > >> >> reference to YARN ticket would be really useful, but I'm >> > not >> > > > sure >> > > > > > if >> > > > > > > >> there >> > > > > > > >> >> are any). >> > > > > > > >> >> >> > > > > > > >> >> All deployment modes (per-job, per-app, ...) are >> planned to >> > > be >> > > > > > tested >> > > > > > > >> and >> > > > > > > >> >> > expect to work with the initial implementation however >> > not >> > > > all >> > > > > > > >> >> deployment >> > > > > > > >> >> > targets (k8s, local, ... >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> If we split the FLIP into two parts / sections that I've >> > > > > > suggested, I >> > > > > > > >> >> don't >> > > > > > > >> >> really think that you need to explicitly test for each >> > > > deployment >> > > > > > > >> scenario >> > > > > > > >> >> / cluster framework, because the DTM part is completely >> > > > > independent >> > > > > > > of >> > > > > > > >> the >> > > > > > > >> >> deployment target. Basically this is what I'm aiming for >> > with >> > > > > > "making >> > > > > > > >> it >> > > > > > > >> >> work with the standalone" (as simple as starting a new >> java >> > > > > > process) >> > > > > > > >> Flink >> > > > > > > >> >> first (which is also how most people deploy streaming >> > > > application >> > > > > > on >> > > > > > > >> k8s >> > > > > > > >> >> and the direction we're pushing forward with the >> > > auto-scaling / >> > > > > > > >> reactive >> > > > > > > >> >> mode initiatives). >> > > > > > > >> >> >> > > > > > > >> >> The whole integration with YARN (let's forget about log >> > > > > aggregation >> > > > > > > >> for a >> > > > > > > >> >> moment) / k8s-native only boils down to how do we make >> the >> > > > keytab >> > > > > > > file >> > > > > > > >> >> local to the JobManager so the DTM can read it, so it's >> > > > basically >> > > > > > > >> built on >> > > > > > > >> >> top of that. The only special thing that needs to be >> tested >> > > > there >> > > > > > is >> > > > > > > >> the >> > > > > > > >> >> "keytab distribution" code path. >> > > > > > > >> >> >> > > > > > > >> >> [1] >> > > > > > > >> >> >> > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java >> > > > > > > >> >> >> > > > > > > >> >> Best, >> > > > > > > >> >> D. >> > > > > > > >> >> >> > > > > > > >> >> On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi < >> > > > > > > >> gabor.g.somo...@gmail.com >> > > > > > > >> >> > >> > > > > > > >> >> wrote: >> > > > > > > >> >> >> > > > > > > >> >> > > There is a separate JobMaster for each job >> > > > > > > >> >> > within a Flink cluster and each JobMaster only has a >> > > partial >> > > > > view >> > > > > > > of >> > > > > > > >> the >> > > > > > > >> >> > task managers >> > > > > > > >> >> > >> > > > > > > >> >> > Good point! I've had a deeper look and you're right. >> We >> > > > > > definitely >> > > > > > > >> need >> > > > > > > >> >> to >> > > > > > > >> >> > find another place. >> > > > > > > >> >> > >> > > > > > > >> >> > > Related per-cluster or per-job keytab: >> > > > > > > >> >> > >> > > > > > > >> >> > In the current code per-cluster keytab is implemented >> and >> > > I'm >> > > > > > > >> intended >> > > > > > > >> >> to >> > > > > > > >> >> > keep it like this within this FLIP. The reason is >> simple: >> > > > > tokens >> > > > > > on >> > > > > > > >> TM >> > > > > > > >> >> side >> > > > > > > >> >> > can be stored within the UserGroupInformation (UGI) >> > > structure >> > > > > > which >> > > > > > > >> is >> > > > > > > >> >> > global. I'm not telling it's impossible to change that >> > but >> > > I >> > > > > > think >> > > > > > > >> that >> > > > > > > >> >> > this is such a complexity which the initial >> > implementation >> > > is >> > > > > not >> > > > > > > >> >> required >> > > > > > > >> >> > to contain. Additionally we've not seen such need from >> > user >> > > > > side. >> > > > > > > If >> > > > > > > >> the >> > > > > > > >> >> > need may rise later on then another FLIP with this >> topic >> > > can >> > > > be >> > > > > > > >> created >> > > > > > > >> >> and >> > > > > > > >> >> > discussed. Proper multi-UGI handling within a single >> JVM >> > > is a >> > > > > > topic >> > > > > > > >> >> where >> > > > > > > >> >> > several round of deep-dive with the Hadoop/YARN guys >> are >> > > > > > required. >> > > > > > > >> >> > >> > > > > > > >> >> > > single DTM instance embedded with >> > > > > > > >> >> > the ResourceManager (the Flink component) >> > > > > > > >> >> > >> > > > > > > >> >> > Could you point to a code where you think it could be >> > added >> > > > > > > exactly? >> > > > > > > >> A >> > > > > > > >> >> > helping hand is welcome here🙂 >> > > > > > > >> >> > >> > > > > > > >> >> > > Then the single (initial) implementation should work >> > with >> > > > all >> > > > > > the >> > > > > > > >> >> > deployments modes out of the box (which is not what >> the >> > > FLIP >> > > > > > > >> suggests). >> > > > > > > >> >> Is >> > > > > > > >> >> > that correct? >> > > > > > > >> >> > >> > > > > > > >> >> > All deployment modes (per-job, per-app, ...) are >> planned >> > to >> > > > be >> > > > > > > tested >> > > > > > > >> >> and >> > > > > > > >> >> > expect to work with the initial implementation however >> > not >> > > > all >> > > > > > > >> >> deployment >> > > > > > > >> >> > targets (k8s, local, ...) are not intended to be >> tested. >> > > Per >> > > > > > > >> deployment >> > > > > > > >> >> > target new jira needs to be created where I expect >> small >> > > > number >> > > > > > of >> > > > > > > >> codes >> > > > > > > >> >> > needs to be added and relatively expensive testing >> effort >> > > is >> > > > > > > >> required. >> > > > > > > >> >> > >> > > > > > > >> >> > > I've taken a look into the prototype and in the >> > > > > > > >> >> "YarnClusterDescriptor" >> > > > > > > >> >> > you're injecting a delegation token into the AM [1] >> > (that's >> > > > > > > obtained >> > > > > > > >> >> using >> > > > > > > >> >> > the provided keytab). If I understand this correctly >> from >> > > > > > previous >> > > > > > > >> >> > discussion / FLIP, this is to support log aggregation >> and >> > > DT >> > > > > has >> > > > > > a >> > > > > > > >> >> limited >> > > > > > > >> >> > validity. How is this DT going to be renewed? >> > > > > > > >> >> > >> > > > > > > >> >> > You're clever and touched a limitation which Spark has >> > too. >> > > > In >> > > > > > > short, >> > > > > > > >> >> after >> > > > > > > >> >> > DT reaches it's max lifetime then log aggregation >> stops. >> > > I've >> > > > > had >> > > > > > > >> >> several >> > > > > > > >> >> > deep-dive rounds with the YARN guys at Spark years >> > because >> > > > > wanted >> > > > > > > to >> > > > > > > >> >> fill >> > > > > > > >> >> > this gap. They can't provide us any way to re-inject >> the >> > > > newly >> > > > > > > >> obtained >> > > > > > > >> >> DT >> > > > > > > >> >> > so at the end I gave up this. >> > > > > > > >> >> > >> > > > > > > >> >> > BR, >> > > > > > > >> >> > G >> > > > > > > >> >> > >> > > > > > > >> >> > >> > > > > > > >> >> > On Mon, 24 Jan 2022, 11:00 David Morávek, < >> > d...@apache.org >> > > > >> > > > > > wrote: >> > > > > > > >> >> > >> > > > > > > >> >> > > Hi Gabor, >> > > > > > > >> >> > > >> > > > > > > >> >> > > There is actually a huge difference between >> JobManager >> > > > > > (process) >> > > > > > > >> and >> > > > > > > >> >> > > JobMaster (job coordinator). The naming is >> > unfortunately >> > > > bit >> > > > > > > >> >> misleading >> > > > > > > >> >> > > here from historical reasons. There is a separate >> > > JobMaster >> > > > > for >> > > > > > > >> each >> > > > > > > >> >> job >> > > > > > > >> >> > > within a Flink cluster and each JobMaster only has a >> > > > partial >> > > > > > view >> > > > > > > >> of >> > > > > > > >> >> the >> > > > > > > >> >> > > task managers (depends on where the slots for a >> > > particular >> > > > > job >> > > > > > > are >> > > > > > > >> >> > > allocated). This means that you'll end up with N >> > > > > > > >> >> > "DelegationTokenManagers" >> > > > > > > >> >> > > competing with each other (N = number of running >> jobs >> > in >> > > > the >> > > > > > > >> cluster). >> > > > > > > >> >> > > >> > > > > > > >> >> > > This makes me think we're mixing two abstraction >> levels >> > > > here: >> > > > > > > >> >> > > >> > > > > > > >> >> > > a) Per-cluster delegation tokens >> > > > > > > >> >> > > - Simpler approach, it would involve a single DTM >> > > instance >> > > > > > > embedded >> > > > > > > >> >> with >> > > > > > > >> >> > > the ResourceManager (the Flink component) >> > > > > > > >> >> > > b) Per-job delegation tokens >> > > > > > > >> >> > > - More complex approach, but could be more flexible >> > from >> > > > the >> > > > > > user >> > > > > > > >> >> side of >> > > > > > > >> >> > > things. >> > > > > > > >> >> > > - Multiple DTM instances, that are bound with the >> > > JobMaster >> > > > > > > >> lifecycle. >> > > > > > > >> >> > > Delegation tokens are attached with a particular >> slots >> > > that >> > > > > are >> > > > > > > >> >> executing >> > > > > > > >> >> > > the job tasks instead of the whole task manager (TM >> > could >> > > > be >> > > > > > > >> executing >> > > > > > > >> >> > > multiple jobs with different tokens). >> > > > > > > >> >> > > - The question is which keytab should be used for >> the >> > > > > > clustering >> > > > > > > >> >> > framework, >> > > > > > > >> >> > > to support log aggregation on YARN (an extra keytab, >> > > keytab >> > > > > > that >> > > > > > > >> comes >> > > > > > > >> >> > with >> > > > > > > >> >> > > the first job?) >> > > > > > > >> >> > > >> > > > > > > >> >> > > I think these are the things that need to be >> clarified >> > in >> > > > the >> > > > > > > FLIP >> > > > > > > >> >> before >> > > > > > > >> >> > > proceeding. >> > > > > > > >> >> > > >> > > > > > > >> >> > > A follow-up question for getting a better >> understanding >> > > > where >> > > > > > > this >> > > > > > > >> >> should >> > > > > > > >> >> > > be headed: Are there any use cases where user may >> want >> > to >> > > > use >> > > > > > > >> >> different >> > > > > > > >> >> > > keytabs with each job, or are we fine with using a >> > > > > cluster-wide >> > > > > > > >> >> keytab? >> > > > > > > >> >> > If >> > > > > > > >> >> > > we go with per-cluster keytabs, is it OK that all >> jobs >> > > > > > submitted >> > > > > > > >> into >> > > > > > > >> >> > this >> > > > > > > >> >> > > cluster can access it (even the future ones)? Should >> > this >> > > > be >> > > > > a >> > > > > > > >> >> security >> > > > > > > >> >> > > concern? >> > > > > > > >> >> > > >> > > > > > > >> >> > > Presume you though I would implement a new class >> with >> > > > > > JobManager >> > > > > > > >> name. >> > > > > > > >> >> > The >> > > > > > > >> >> > > > plan is not that. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > > I've never suggested such thing. >> > > > > > > >> >> > > >> > > > > > > >> >> > > >> > > > > > > >> >> > > > No. That said earlier DT handling is planned to be >> > done >> > > > > > > >> completely >> > > > > > > >> >> in >> > > > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains >> > tokens >> > > > in >> > > > > > the >> > > > > > > >> >> proper >> > > > > > > >> >> > > time >> > > > > > > >> >> > > > when needed. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > > Then the single (initial) implementation should work >> > with >> > > > all >> > > > > > the >> > > > > > > >> >> > > deployments modes out of the box (which is not what >> the >> > > > FLIP >> > > > > > > >> >> suggests). >> > > > > > > >> >> > Is >> > > > > > > >> >> > > that correct? >> > > > > > > >> >> > > >> > > > > > > >> >> > > If the cluster framework, also requires delegation >> > token >> > > > for >> > > > > > > their >> > > > > > > >> >> inner >> > > > > > > >> >> > > working (this is IMO only applies to YARN), it might >> > need >> > > > an >> > > > > > > extra >> > > > > > > >> >> step >> > > > > > > >> >> > > (injecting the token into application master >> > container). >> > > > > > > >> >> > > >> > > > > > > >> >> > > Separating the individual layers (actual Flink >> cluster >> > - >> > > > > > > basically >> > > > > > > >> >> making >> > > > > > > >> >> > > this work with a standalone deployment / "cluster >> > > > > framework" - >> > > > > > > >> >> support >> > > > > > > >> >> > for >> > > > > > > >> >> > > YARN log aggregation) in the FLIP would be useful. >> > > > > > > >> >> > > >> > > > > > > >> >> > > Reading the linked Spark readme could be useful. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > > I've read that, but please be patient with the >> > questions, >> > > > > > > Kerberos >> > > > > > > >> is >> > > > > > > >> >> not >> > > > > > > >> >> > > an easy topic to get into and I've had a very little >> > > > contact >> > > > > > with >> > > > > > > >> it >> > > > > > > >> >> in >> > > > > > > >> >> > the >> > > > > > > >> >> > > past. >> > > > > > > >> >> > > >> > > > > > > >> >> > > >> > > > > > > >> >> > > >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > > I've taken a look into the prototype and in the >> > > > > > > >> >> "YarnClusterDescriptor" >> > > > > > > >> >> > > you're injecting a delegation token into the AM [1] >> > > (that's >> > > > > > > >> obtained >> > > > > > > >> >> > using >> > > > > > > >> >> > > the provided keytab). If I understand this correctly >> > from >> > > > > > > previous >> > > > > > > >> >> > > discussion / FLIP, this is to support log >> aggregation >> > and >> > > > DT >> > > > > > has >> > > > > > > a >> > > > > > > >> >> > limited >> > > > > > > >> >> > > validity. How is this DT going to be renewed? >> > > > > > > >> >> > > >> > > > > > > >> >> > > [1] >> > > > > > > >> >> > > >> > > > > > > >> >> > > >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261 >> > > > > > > >> >> > > >> > > > > > > >> >> > > Best, >> > > > > > > >> >> > > D. >> > > > > > > >> >> > > >> > > > > > > >> >> > > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi < >> > > > > > > >> >> gabor.g.somo...@gmail.com >> > > > > > > >> >> > > >> > > > > > > >> >> > > wrote: >> > > > > > > >> >> > > >> > > > > > > >> >> > > > Here is the exact class, I'm from mobile so not >> had a >> > > > look >> > > > > at >> > > > > > > the >> > > > > > > >> >> exact >> > > > > > > >> >> > > > class name: >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176 >> > > > > > > >> >> > > > That keeps track of TMs where the tokens can be >> sent >> > > to. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > > My feeling would be that we shouldn't really >> > > introduce >> > > > a >> > > > > > new >> > > > > > > >> >> > component >> > > > > > > >> >> > > > with >> > > > > > > >> >> > > > a custom lifecycle, but rather we should try to >> > > > incorporate >> > > > > > > this >> > > > > > > >> >> into >> > > > > > > >> >> > > > existing ones. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > Can you be more specific? Presume you though I >> would >> > > > > > implement >> > > > > > > a >> > > > > > > >> new >> > > > > > > >> >> > > class >> > > > > > > >> >> > > > with JobManager name. The plan is not that. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > > If I understand this correctly, this means that >> we >> > > then >> > > > > > push >> > > > > > > >> the >> > > > > > > >> >> > token >> > > > > > > >> >> > > > renewal logic to YARN. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > No. That said earlier DT handling is planned to be >> > done >> > > > > > > >> completely >> > > > > > > >> >> in >> > > > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains >> > tokens >> > > > in >> > > > > > the >> > > > > > > >> >> proper >> > > > > > > >> >> > > time >> > > > > > > >> >> > > > when needed. YARN log aggregation is a totally >> > > different >> > > > > > > feature, >> > > > > > > >> >> where >> > > > > > > >> >> > > > YARN does the renewal. Log aggregation was an >> example >> > > why >> > > > > the >> > > > > > > >> code >> > > > > > > >> >> > can't >> > > > > > > >> >> > > be >> > > > > > > >> >> > > > 100% reusable for all resource managers. Reading >> the >> > > > linked >> > > > > > > Spark >> > > > > > > >> >> > readme >> > > > > > > >> >> > > > could be useful. >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > G >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > On Fri, 21 Jan 2022, 21:05 David Morávek, < >> > > > d...@apache.org >> > > > > > >> > > > > > > >> wrote: >> > > > > > > >> >> > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > JobManager is the Flink class. >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > There is no such class in Flink. The closest >> thing >> > to >> > > > the >> > > > > > > >> >> JobManager >> > > > > > > >> >> > > is a >> > > > > > > >> >> > > > > ClusterEntrypoint. The cluster entrypoint spawns >> > new >> > > RM >> > > > > > > Runner >> > > > > > > >> & >> > > > > > > >> >> > > > Dispatcher >> > > > > > > >> >> > > > > Runner that start participating in the leader >> > > election. >> > > > > > Once >> > > > > > > >> they >> > > > > > > >> >> > gain >> > > > > > > >> >> > > > > leadership they spawn the actual underlying >> > instances >> > > > of >> > > > > > > these >> > > > > > > >> two >> > > > > > > >> >> > > "main >> > > > > > > >> >> > > > > components". >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > My feeling would be that we shouldn't really >> > > introduce >> > > > a >> > > > > > new >> > > > > > > >> >> > component >> > > > > > > >> >> > > > with >> > > > > > > >> >> > > > > a custom lifecycle, but rather we should try to >> > > > > incorporate >> > > > > > > >> this >> > > > > > > >> >> into >> > > > > > > >> >> > > > > existing ones. >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > My biggest concerns would be: >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > - How would the lifecycle of the new component >> look >> > > > like >> > > > > > with >> > > > > > > >> >> regards >> > > > > > > >> >> > > to >> > > > > > > >> >> > > > HA >> > > > > > > >> >> > > > > setups. If we really try to decide to introduce >> a >> > > > > > completely >> > > > > > > >> new >> > > > > > > >> >> > > > component, >> > > > > > > >> >> > > > > how should this work in case of multiple >> JobManager >> > > > > > > instances? >> > > > > > > >> >> > > > > - Which components does it talk to / how? For >> > example >> > > > how >> > > > > > > does >> > > > > > > >> the >> > > > > > > >> >> > > > > broadcast of new token to task managers >> > > > > > (TaskManagerGateway) >> > > > > > > >> look >> > > > > > > >> >> > like? >> > > > > > > >> >> > > > Do >> > > > > > > >> >> > > > > we simply introduce a new RPC on the >> > > > > ResourceManagerGateway >> > > > > > > >> that >> > > > > > > >> >> > > > broadcasts >> > > > > > > >> >> > > > > it or does the new component need to do some >> kind >> > of >> > > > > > > >> bookkeeping >> > > > > > > >> >> of >> > > > > > > >> >> > > task >> > > > > > > >> >> > > > > managers that it needs to notify? >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > YARN based HDFS log aggregation would not work >> by >> > > > > dropping >> > > > > > > that >> > > > > > > >> >> code. >> > > > > > > >> >> > > > Just >> > > > > > > >> >> > > > > > to be crystal clear, the actual implementation >> > > > contains >> > > > > > > this >> > > > > > > >> fir >> > > > > > > >> >> > > > exactly >> > > > > > > >> >> > > > > > this reason. >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > This is the missing part +1. If I understand >> this >> > > > > > correctly, >> > > > > > > >> this >> > > > > > > >> >> > means >> > > > > > > >> >> > > > > that we then push the token renewal logic to >> YARN. >> > > How >> > > > do >> > > > > > you >> > > > > > > >> >> plan to >> > > > > > > >> >> > > > > implement the renewal logic on k8s? >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > D. >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi < >> > > > > > > >> >> > > gabor.g.somo...@gmail.com >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > wrote: >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > > > I think we might both mean something >> different >> > by >> > > > the >> > > > > > RM. >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > You feel it well, I've not specified these >> terms >> > > well >> > > > > in >> > > > > > > the >> > > > > > > >> >> > > > explanation. >> > > > > > > >> >> > > > > > RM I meant resource management framework. >> > > JobManager >> > > > is >> > > > > > the >> > > > > > > >> >> Flink >> > > > > > > >> >> > > > class. >> > > > > > > >> >> > > > > > This means that inside JM instance there will >> be >> > a >> > > > DTM >> > > > > > > >> >> instance, so >> > > > > > > >> >> > > > they >> > > > > > > >> >> > > > > > would have the same lifecycle. Hope I've >> answered >> > > the >> > > > > > > >> question. >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > > If we have tokens available on the client >> side, >> > > why >> > > > > do >> > > > > > we >> > > > > > > >> >> need to >> > > > > > > >> >> > > set >> > > > > > > >> >> > > > > > them >> > > > > > > >> >> > > > > > into the AM (yarn specific concept) launch >> > context? >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > YARN based HDFS log aggregation would not >> work by >> > > > > > dropping >> > > > > > > >> that >> > > > > > > >> >> > code. >> > > > > > > >> >> > > > > Just >> > > > > > > >> >> > > > > > to be crystal clear, the actual implementation >> > > > contains >> > > > > > > this >> > > > > > > >> fir >> > > > > > > >> >> > > > exactly >> > > > > > > >> >> > > > > > this reason. >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > G >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, < >> > > > > > d...@apache.org >> > > > > > > > >> > > > > > > >> >> wrote: >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > > Hi Gabor, >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > 1. One thing is important, token management >> is >> > > > > planned >> > > > > > to >> > > > > > > >> be >> > > > > > > >> >> done >> > > > > > > >> >> > > > > > > > generically within Flink and not >> scattered in >> > > RM >> > > > > > > specific >> > > > > > > >> >> code. >> > > > > > > >> >> > > > > > > JobManager >> > > > > > > >> >> > > > > > > > has a DelegationTokenManager which obtains >> > > tokens >> > > > > > > >> >> time-to-time >> > > > > > > >> >> > > (if >> > > > > > > >> >> > > > > > > > configured properly). JM knows which >> > > TaskManagers >> > > > > are >> > > > > > > in >> > > > > > > >> >> place >> > > > > > > >> >> > so >> > > > > > > >> >> > > > it >> > > > > > > >> >> > > > > > can >> > > > > > > >> >> > > > > > > > distribute it to all TMs. That's it >> > basically. >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > I think we might both mean something >> different >> > by >> > > > the >> > > > > > RM. >> > > > > > > >> >> > > JobManager >> > > > > > > >> >> > > > is >> > > > > > > >> >> > > > > > > basically just a process encapsulating >> multiple >> > > > > > > components, >> > > > > > > >> >> one >> > > > > > > >> >> > of >> > > > > > > >> >> > > > > which >> > > > > > > >> >> > > > > > is >> > > > > > > >> >> > > > > > > a ResourceManager, which is the component >> that >> > > > > manages >> > > > > > > task >> > > > > > > >> >> > manager >> > > > > > > >> >> > > > > > > registrations [1]. There is more or less a >> > single >> > > > > > > >> >> implementation >> > > > > > > >> >> > of >> > > > > > > >> >> > > > the >> > > > > > > >> >> > > > > > RM >> > > > > > > >> >> > > > > > > with plugable drivers for the active >> > integrations >> > > > > > (yarn, >> > > > > > > >> k8s). >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > It would be great if you could share more >> > details >> > > > of >> > > > > > how >> > > > > > > >> >> exactly >> > > > > > > >> >> > > the >> > > > > > > >> >> > > > > DTM >> > > > > > > >> >> > > > > > is >> > > > > > > >> >> > > > > > > going to fit in the current JM architecture. >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > 2. 99.9% of the code is generic but each RM >> > > handles >> > > > > > > tokens >> > > > > > > >> >> > > > > differently. A >> > > > > > > >> >> > > > > > > > good example is YARN obtains tokens on >> client >> > > > side >> > > > > > and >> > > > > > > >> then >> > > > > > > >> >> > sets >> > > > > > > >> >> > > > them >> > > > > > > >> >> > > > > > on >> > > > > > > >> >> > > > > > > > the newly created AM container launch >> > context. >> > > > This >> > > > > > is >> > > > > > > >> >> purely >> > > > > > > >> >> > > YARN >> > > > > > > >> >> > > > > > > specific >> > > > > > > >> >> > > > > > > > and cant't be spared. With my actual plans >> > > > > standalone >> > > > > > > >> can be >> > > > > > > >> >> > > > changed >> > > > > > > >> >> > > > > to >> > > > > > > >> >> > > > > > > use >> > > > > > > >> >> > > > > > > > the framework. By using it I mean no RM >> > > specific >> > > > > DTM >> > > > > > or >> > > > > > > >> >> > > whatsoever >> > > > > > > >> >> > > > is >> > > > > > > >> >> > > > > > > > needed. >> > > > > > > >> >> > > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > If we have tokens available on the client >> side, >> > > why >> > > > > do >> > > > > > we >> > > > > > > >> >> need to >> > > > > > > >> >> > > set >> > > > > > > >> >> > > > > > them >> > > > > > > >> >> > > > > > > into the AM (yarn specific concept) launch >> > > context? >> > > > > Why >> > > > > > > >> can't >> > > > > > > >> >> we >> > > > > > > >> >> > > > simply >> > > > > > > >> >> > > > > > > send them to the JM, eg. as a parameter of >> the >> > > job >> > > > > > > >> submission >> > > > > > > >> >> / >> > > > > > > >> >> > via >> > > > > > > >> >> > > > > > > separate RPC call? There might be something >> I'm >> > > > > missing >> > > > > > > >> due to >> > > > > > > >> >> > > > limited >> > > > > > > >> >> > > > > > > knowledge, but handling the token on the >> > "cluster >> > > > > > > >> framework" >> > > > > > > >> >> > level >> > > > > > > >> >> > > > > > doesn't >> > > > > > > >> >> > > > > > > seem necessary. >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > [1] >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > Best, >> > > > > > > >> >> > > > > > > D. >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor >> Somogyi < >> > > > > > > >> >> > > > > gabor.g.somo...@gmail.com >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > wrote: >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > > > Oh and one more thing. I'm planning to add >> > this >> > > > > > feature >> > > > > > > >> in >> > > > > > > >> >> > small >> > > > > > > >> >> > > > > chunk >> > > > > > > >> >> > > > > > of >> > > > > > > >> >> > > > > > > > PRs because security is super hairy area. >> > That >> > > > way >> > > > > > > >> reviewers >> > > > > > > >> >> > can >> > > > > > > >> >> > > be >> > > > > > > >> >> > > > > > more >> > > > > > > >> >> > > > > > > > easily obtains the concept. >> > > > > > > >> >> > > > > > > > >> > > > > > > >> >> > > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, >> < >> > > > > > > >> d...@apache.org> >> > > > > > > >> >> > > wrote: >> > > > > > > >> >> > > > > > > > >> > > > > > > >> >> > > > > > > > > Hi Gabor, >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > thanks for drafting the FLIP, I think >> > having >> > > a >> > > > > > solid >> > > > > > > >> >> Kerberos >> > > > > > > >> >> > > > > support >> > > > > > > >> >> > > > > > > is >> > > > > > > >> >> > > > > > > > > crucial for many enterprise deployments. >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > I have multiple questions regarding the >> > > > > > > implementation >> > > > > > > >> >> (note >> > > > > > > >> >> > > > that I >> > > > > > > >> >> > > > > > > have >> > > > > > > >> >> > > > > > > > > very limited knowledge of Kerberos): >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > 1) If I understand it correctly, we'll >> only >> > > > > obtain >> > > > > > > >> tokens >> > > > > > > >> >> in >> > > > > > > >> >> > > the >> > > > > > > >> >> > > > > job >> > > > > > > >> >> > > > > > > > > manager and then we'll distribute them >> via >> > > RPC >> > > > > > (needs >> > > > > > > >> to >> > > > > > > >> >> be >> > > > > > > >> >> > > > > secured). >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > Can you please outline how the >> > communication >> > > > will >> > > > > > > look >> > > > > > > >> >> like? >> > > > > > > >> >> > Is >> > > > > > > >> >> > > > the >> > > > > > > >> >> > > > > > > > > DelegationTokenManager going to be a >> part >> > of >> > > > the >> > > > > > > >> >> > > ResourceManager? >> > > > > > > >> >> > > > > Can >> > > > > > > >> >> > > > > > > you >> > > > > > > >> >> > > > > > > > > outline it's lifecycle / how it's going >> to >> > be >> > > > > > > >> integrated >> > > > > > > >> >> > there? >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > 2) Do we really need a YARN / k8s >> specific >> > > > > > > >> >> implementations? >> > > > > > > >> >> > Is >> > > > > > > >> >> > > it >> > > > > > > >> >> > > > > > > > possible >> > > > > > > >> >> > > > > > > > > to obtain / renew a token in a generic >> way? >> > > > Maybe >> > > > > > to >> > > > > > > >> >> rephrase >> > > > > > > >> >> > > > that, >> > > > > > > >> >> > > > > > is >> > > > > > > >> >> > > > > > > it >> > > > > > > >> >> > > > > > > > > possible to implement >> > DelegationTokenManager >> > > > for >> > > > > > the >> > > > > > > >> >> > standalone >> > > > > > > >> >> > > > > > Flink? >> > > > > > > >> >> > > > > > > If >> > > > > > > >> >> > > > > > > > > we're able to solve this point, it >> could be >> > > > > > possible >> > > > > > > to >> > > > > > > >> >> > target >> > > > > > > >> >> > > > all >> > > > > > > >> >> > > > > > > > > deployment scenarios with a single >> > > > > implementation. >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > Best, >> > > > > > > >> >> > > > > > > > > D. >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan >> > Zhang >> > > < >> > > > > > > >> >> > > > > > zuston.sha...@gmail.com> >> > > > > > > >> >> > > > > > > > > wrote: >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > > > Hi G >> > > > > > > >> >> > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > Thanks for your explain in detail. I >> have >> > > > > gotten >> > > > > > > your >> > > > > > > >> >> > > thoughts, >> > > > > > > >> >> > > > > and >> > > > > > > >> >> > > > > > > any >> > > > > > > >> >> > > > > > > > > > way this proposal >> > > > > > > >> >> > > > > > > > > > is a great improvement. >> > > > > > > >> >> > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > Looking forward to your implementation >> > and >> > > i >> > > > > will >> > > > > > > >> keep >> > > > > > > >> >> > focus >> > > > > > > >> >> > > on >> > > > > > > >> >> > > > > it. >> > > > > > > >> >> > > > > > > > > > Thanks again. >> > > > > > > >> >> > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > Best >> > > > > > > >> >> > > > > > > > > > JunFan. >> > > > > > > >> >> > > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor >> > > > Somogyi < >> > > > > > > >> >> > > > > > > > gabor.g.somo...@gmail.com >> > > > > > > >> >> > > > > > > > > >, >> > > > > > > >> >> > > > > > > > > > wrote: >> > > > > > > >> >> > > > > > > > > > > Just to confirm keeping >> > > > > > > >> >> > > > > > "security.kerberos.fetch.delegation-token" >> > > > > > > >> >> > > > > > > is >> > > > > > > >> >> > > > > > > > > > added >> > > > > > > >> >> > > > > > > > > > > to the doc. >> > > > > > > >> >> > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > BR, >> > > > > > > >> >> > > > > > > > > > > G >> > > > > > > >> >> > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM >> Gabor >> > > > > Somogyi < >> > > > > > > >> >> > > > > > > > > gabor.g.somo...@gmail.com >> > > > > > > >> >> > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > wrote: >> > > > > > > >> >> > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > Hi JunFan, >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > By the way, maybe this should be >> > > added >> > > > in >> > > > > > the >> > > > > > > >> >> > migration >> > > > > > > >> >> > > > > plan >> > > > > > > >> >> > > > > > or >> > > > > > > >> >> > > > > > > > > > > > intergation section in the >> FLIP-211. >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > Going to add this soon. >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > Besides, I have a question that >> the >> > > KDC >> > > > > > will >> > > > > > > >> >> collapse >> > > > > > > >> >> > > > when >> > > > > > > >> >> > > > > > the >> > > > > > > >> >> > > > > > > > > > cluster >> > > > > > > >> >> > > > > > > > > > > > reached 200 nodes you described >> > > > > > > >> >> > > > > > > > > > > > in the google doc. Do you have any >> > > > > attachment >> > > > > > > or >> > > > > > > >> >> > > reference >> > > > > > > >> >> > > > to >> > > > > > > >> >> > > > > > > prove >> > > > > > > >> >> > > > > > > > > it? >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > "KDC *may* collapse under some >> > > > > circumstances" >> > > > > > > is >> > > > > > > >> the >> > > > > > > >> >> > > proper >> > > > > > > >> >> > > > > > > > wording. >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > We have several customers who are >> > > > executing >> > > > > > > >> >> workloads >> > > > > > > >> >> > on >> > > > > > > >> >> > > > > > > > Spark/Flink. >> > > > > > > >> >> > > > > > > > > > Most >> > > > > > > >> >> > > > > > > > > > > > of the time I'm facing their >> > > > > > > >> >> > > > > > > > > > > > daily issues which is heavily >> > > environment >> > > > > and >> > > > > > > >> >> use-case >> > > > > > > >> >> > > > > > dependent. >> > > > > > > >> >> > > > > > > > > I've >> > > > > > > >> >> > > > > > > > > > > > seen various cases: >> > > > > > > >> >> > > > > > > > > > > > * where the mentioned ~1k nodes >> were >> > > > > working >> > > > > > > fine >> > > > > > > >> >> > > > > > > > > > > > * where KDC thought the number of >> > > > requests >> > > > > > are >> > > > > > > >> >> coming >> > > > > > > >> >> > > from >> > > > > > > >> >> > > > > DDOS >> > > > > > > >> >> > > > > > > > > attack >> > > > > > > >> >> > > > > > > > > > so >> > > > > > > >> >> > > > > > > > > > > > discontinued authentication >> > > > > > > >> >> > > > > > > > > > > > * where KDC was simply not >> responding >> > > > > because >> > > > > > > of >> > > > > > > >> the >> > > > > > > >> >> > load >> > > > > > > >> >> > > > > > > > > > > > * where KDC was intermittently had >> > some >> > > > > > outage >> > > > > > > >> (this >> > > > > > > >> >> > was >> > > > > > > >> >> > > > the >> > > > > > > >> >> > > > > > most >> > > > > > > >> >> > > > > > > > > nasty >> > > > > > > >> >> > > > > > > > > > > > thing) >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > Since you're managing relatively >> big >> > > > > cluster >> > > > > > > then >> > > > > > > >> >> you >> > > > > > > >> >> > > know >> > > > > > > >> >> > > > > that >> > > > > > > >> >> > > > > > > KDC >> > > > > > > >> >> > > > > > > > > is >> > > > > > > >> >> > > > > > > > > > not >> > > > > > > >> >> > > > > > > > > > > > only used by Spark/Flink workloads >> > > > > > > >> >> > > > > > > > > > > > but the whole company IT >> > infrastructure >> > > > is >> > > > > > > >> bombing >> > > > > > > >> >> it >> > > > > > > >> >> > so >> > > > > > > >> >> > > it >> > > > > > > >> >> > > > > > > really >> > > > > > > >> >> > > > > > > > > > depends >> > > > > > > >> >> > > > > > > > > > > > on other factors too whether KDC >> is >> > > > > reaching >> > > > > > > >> >> > > > > > > > > > > > it's limit or not. Not sure what >> kind >> > > of >> > > > > > > evidence >> > > > > > > >> >> are >> > > > > > > >> >> > you >> > > > > > > >> >> > > > > > looking >> > > > > > > >> >> > > > > > > > for >> > > > > > > >> >> > > > > > > > > > but >> > > > > > > >> >> > > > > > > > > > > > I'm not authorized to share any >> > > > information >> > > > > > > about >> > > > > > > >> >> > > > > > > > > > > > our clients data. >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > One thing is for sure. The more >> > > external >> > > > > > system >> > > > > > > >> >> types >> > > > > > > >> >> > are >> > > > > > > >> >> > > > > used >> > > > > > > >> >> > > > > > in >> > > > > > > >> >> > > > > > > > > > > > workloads (for ex. HDFS, HBase, >> Hive, >> > > > > Kafka) >> > > > > > > >> which >> > > > > > > >> >> > > > > > > > > > > > are authenticating through KDC the >> > more >> > > > > > > >> possibility >> > > > > > > >> >> to >> > > > > > > >> >> > > > reach >> > > > > > > >> >> > > > > > this >> > > > > > > >> >> > > > > > > > > > > > threshold when the cluster is big >> > > enough. >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > All in all this feature is here to >> > help >> > > > all >> > > > > > > users >> > > > > > > >> >> never >> > > > > > > >> >> > > > reach >> > > > > > > >> >> > > > > > > this >> > > > > > > >> >> > > > > > > > > > > > limitation. >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > BR, >> > > > > > > >> >> > > > > > > > > > > > G >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM >> 张俊帆 < >> > > > > > > >> >> > > > zuston.sha...@gmail.com >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > > > > wrote: >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > Hi G >> > > > > > > >> >> > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > Thanks for your quick reply. I >> > think >> > > > > > > reserving >> > > > > > > >> the >> > > > > > > >> >> > > config >> > > > > > > >> >> > > > > of >> > > > > > > >> >> > > > > > > > > > > > > >> > > > > *security.kerberos.fetch.delegation-token* >> > > > > > > >> >> > > > > > > > > > > > > and simplifying disable the >> token >> > > > > fetching >> > > > > > > is a >> > > > > > > >> >> good >> > > > > > > >> >> > > > > idea.By >> > > > > > > >> >> > > > > > > the >> > > > > > > >> >> > > > > > > > > way, >> > > > > > > >> >> > > > > > > > > > > > > maybe this should be added >> > > > > > > >> >> > > > > > > > > > > > > in the migration plan or >> > intergation >> > > > > > section >> > > > > > > in >> > > > > > > >> >> the >> > > > > > > >> >> > > > > FLIP-211. >> > > > > > > >> >> > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > Besides, I have a question that >> the >> > > KDC >> > > > > > will >> > > > > > > >> >> collapse >> > > > > > > >> >> > > > when >> > > > > > > >> >> > > > > > the >> > > > > > > >> >> > > > > > > > > > cluster >> > > > > > > >> >> > > > > > > > > > > > > reached 200 nodes you described >> > > > > > > >> >> > > > > > > > > > > > > in the google doc. Do you have >> any >> > > > > > attachment >> > > > > > > >> or >> > > > > > > >> >> > > > reference >> > > > > > > >> >> > > > > to >> > > > > > > >> >> > > > > > > > prove >> > > > > > > >> >> > > > > > > > > > it? >> > > > > > > >> >> > > > > > > > > > > > > Because in our internal >> > per-cluster, >> > > > > > > >> >> > > > > > > > > > > > > the nodes reaches > 1000 and KDC >> > > looks >> > > > > > good. >> > > > > > > >> Do i >> > > > > > > >> >> > > missed >> > > > > > > >> >> > > > or >> > > > > > > >> >> > > > > > > > > > misunderstood >> > > > > > > >> >> > > > > > > > > > > > > something? Please correct me. >> > > > > > > >> >> > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > Best >> > > > > > > >> >> > > > > > > > > > > > > JunFan. >> > > > > > > >> >> > > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800, >> > > > > > > >> >> dev@flink.apache.org >> > > > > > > >> >> > , >> > > > > > > >> >> > > > > wrote: >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ >> > > > > > > >> >> > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > >> > > > > > > >> >> > > > > > > > > >> > > > > > > >> >> > > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > >> > > > > > > >> >> > > >> > > > > > > >> >> > >> > > > > > > >> >> >> > > > > > > >> > >> > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >