Thanks Chesnay & Zhu. I will start a new vote thread soon.
Best, Lijie Chesnay Schepler <ches...@apache.org> 于2022年6月15日周三 15:49写道: > To expand a bit for transparency, Zhu Zhu and I had long discussion > (literally spanning days) about this FLIP and it's relation to > speculative execution. > > The gist of it is that speculative execution doesn't strictly need the > block list; just _some_ mechanism to select/request slots from other nodes. > It is however at this time the easiest way to implement it because of > the technical debt we have in the scheduler components (e.g., adaptive > scheduler not being used for all streaming jobs or default scheduler not > being integrated fully into declarative resource management). > Because of that I was worried that we might expand the API now to unlock > speculative execution, but then end up not actually requiring it down > the line (but still being stuck with it). > This should give us quite a bit more freedom as to how we implement it. > Which is particularly important because we already identified some > limitations in the current design (e.g., blocks not being scoped to > jobs, resourceID-based blocks intefeing with hard-coded resource IDs, > potentially blocking more slots than necessary). > > Shall we start a new vote thread, since the design changed quite a bit? > > On 15/06/2022 09:33, Zhu Zhu wrote: > > Hi everyone, > > Thank you all for the feedback! > > > > We receive concerns that the blocklist feature is not strongly > > required except for the needs of speculative execution. So as the > > first step, we will limit the scope of FLIP-224 to only support > > speculative execution, and therefore, not add public interfaces for > > users to manipulate with the blocklist directly. > > > > If later we receive strong requirements for blocklist from users, we > > will have another FLIP to open it to users with well designed public > > interfaces and web UI. > > > > Thanks, > > Zhu > > > > Zhu Zhu <reed...@gmail.com> 于2022年6月10日周五 18:45写道: > >> 1) With the declarative slot allocation protocol, it's not easy to > >> keep the slow slots which may have satisfy the slot request already, > >> but ask for more slots from the resource manager. > >> And we are also concerned to use detected slow slots, which may add > >> load to the slow nodes and further slow down the tasks on the slow > >> nodes. Because we found slow nodes are mostly caused by heavy loads. > >> > >> 2) I see your point. The blocker here is that batch jobs currently > >> uses DefaultScheduler. The scheduler does not see slots directly and > >> does not know which of them are from slow nodes, so it's hard for it > >> to rescale the vertex according to this information. Besides that, it > >> cannot help with the stage that first observed and affected the slow > >> nodes. > >> > >> Thanks, > >> Zhu > >> > >> Chesnay Schepler <ches...@apache.org> 于2022年6月10日周五 17:04写道: > >>> 1) > >>> It's true that if we handle this entirely in the scheduler we may get > a bunch of slow slots from the RM. > >>> My point is that this isn't necessarily a problem; it depends on how > you handle those slots. > >>> > >>> We anyway need to account for the possibility that we're not getting > any new fast slots from the RM. > >>> With that in mind, I'd rather not categorically throw away the slow > slots we got, but try to make use of them somehow. > >>> > >>> 2) > >>> The idea was to rescale the job when the current stage finishes. > >>> The speculative execution handles slow nodes being detected in the > current stage, > >>> next stage we use _some_ strategy to handle slow nodes (be it ignoring > those, rescaling the job, ...). > >>> > >>> 3) > >>> It's a fair observation that once you push this to the RM you end up > with a de-facto blocklist :) > >>> > >>> On 08/06/2022 17:11, Zhu Zhu wrote: > >>> > >>> Thanks Chesnay for the feedback in the vote thread. > >>> (https://lists.apache.org/thread/opc7jg3rpxnwotkb0fcn4wnm02m4397o) > >>> > >>> I'd like to continue the discussion in this thread so that the > >>> discussions can be better tracked. > >>> > >>> Regarding your questions, here are my thoughts: > >>> 1. The current locality mechanism does not work well to avoid > >>> deploying tasks to slow nodes because it cannot proactively reject or > >>> release slots from the slow nodes. And it cannot help at the resource > >>> manager side to avoid allocating free slots or launching new > >>> TaskManagers on slow nodes. > >>> 2. Dynamically downscaling or upscaling a batch job is usually > >>> unacceptable because it means to re-run the whole stage. > >>> 3. Extending the requirement declaration to have a notion of > >>> "undesirable nodes" is an option. And it is actually how we started. > >>> Considering implementation details, we found we need that > >>> - a tracker to collect all the undesirable(detected slow nodes) > >>> - to filter out slots from undesirable nodes when allocating slots > >>> from the SlotPool > >>> - ask the ResourceManager for slots that are not on the undesirable > >>> nodes. The ResourceManager may further need to ask for new > >>> TaskManagers that are not on the undesirable nodes. > >>> Then, with all these functionality, we found that we almost have a > >>> blocklist mechanism. As blocklist mechanism is a common concept and is > >>> possible to benefit users, we took this chance to propose the > >>> blocklist mechanism. > >>> 4. A cluster-wide shared blocklist is not a must for speculative > >>> execution at the moment. It is mainly part of a standalone blocklist > >>> feature to host user specified block items. To avoid sharing job > >>> specific blocked items between jobs, one way is to add a nullable > >>> JobID tag for the blocked item. > >>> > >>> Thanks, > >>> Zhu > >>> > >>> Zhu Zhu <reed...@gmail.com> 于2022年6月7日周二 10:58写道: > >>> > >>> Hi Chesnay, > >>> > >>> Would you please take a look at the FLIP and discussion to see if all > >>> your concerns have been addressed? > >>> > >>> Thanks, > >>> Zhu > >>> > >>> Zhu Zhu <reed...@gmail.com> 于2022年5月28日周六 13:26写道: > >>> > >>> Regarding the concern of the SlotManager, my two cents: > >>> 1. it is necessary for the SlotManager to host blocked slots, in 2 > cases: > >>> a. In standalone mode, a taskmanager may be temporarily added to > >>> the blocklist. We do not want the TM to get disconnected and shut down. > >>> So we need to keep its connection to RM and keep hosting its slots. > >>> b. When we want to avoid allocating slots to a slow nodes but do not > >>> want to kill current running tasks on the nodes (MARK_BLOCKED mode). > >>> > >>> There is possible a way to keep the connection of a blocked task > manager > >>> while hide its slots from SlotManager, but I feel it may be even much > more > >>> complicated. > >>> > >>> 2. It will not complicate the SlotManager too much. The SlotManager > will > >>> be offered a BlockedTaskManagerChecker when created, and just need > >>> to use it to filter out blocked slots on slot request. Therefore I > think the > >>> complication is acceptable. > >>> > >>> Thanks, > >>> Zhu > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月25日周三 15:26写道: > >>> > >>> Hi everyone, > >>> > >>> I've updated the FLIP according to Chesnay's feedback, changes as > follows: > >>> 1. Change the GET result to a map. > >>> 2. Only left *endTimestamp* in ADD operation, and change the rest (from > >>> POST) to PUT > >>> 3. Introduce a new slot pool implementation(BlocklistSlotPool) to > >>> encapsulate blocklist related functions. > >>> 4. Remove *mainThread* from BlocklistTracker, instead provide a > >>> *removeTimeoutItems* method to be called by outside components。 > >>> > >>> Best, > >>> Lijie > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月23日周一 22:51写道: > >>> > >>> Hi Chesnay, > >>> > >>> Thanks for feedback. > >>> > >>> 1. Regarding the TM/Node id. Do you mean special characters may appear > in > >>> the rest URL? Actually, I don't think so. The task manager id in REST > API > >>> should be the *ResourceID* of taskmanager in Flink, there should be no > >>> special characters, and some existing REST APIs are already using it, > e.g. > >>> GET: http://{jm_rest_address:port}/taskmanagers/<taskmanagerid>. The > node > >>> id should be an IP of a machine or node name in Yarn/Kubernetes, I > think it > >>> should also have no special characters. > >>> 2. Regarding the GET query responses. I agree with you, it makes sense > to > >>> change the GET result to a map. > >>> > >>> 3. Regarding the endTimestamp. I also agree with you, endTimestamp can > >>> cover everything, and the endTimestamp is a unix timestamp, there > should be > >>> no timezone issues. But I think PUT and DELETE are enough, no PATCH. > The > >>> add rest api is add or update, PUT can cover this semantics. > >>> > >>> 4. Regarding the slot pool/manager. I don't think the current slotpool > >>> and slotmanager are able to support the MARK_BLOCKED(slots that are > >>> already allocated will not be affected) action. The reasons are as > >>> follows: > >>> > >>> a) for slot pool, with the MARK_BLOCKED action, when a slot state > changes > >>> from reserved(task assigned) to free(no task assigned), it is > necessary to > >>> check whether the slot should be released immediately(it should be > released > >>> immediately if the task manager is blocked, otherwise it may be > allocated > >>> to other tasks). I think it cannot be supported without being aware of > >>> the blocklist information. Compared to the solution in FLIP, a more > >>> appropriate/prefered way may be: Introduce a new slot pool > >>> implementation for blocklist(may be named BlocklistSlotPool, it > >>> extends/wrapps the original slot pool), and implement the parts that > need > >>> to be aware of the blocklist in this newly introduced slot pool, and > the > >>> original slot pool basically does not need to change. > >>> > >>> b) for slot manager, with the MARK_BLOCKED action, there may be free > but > >>> blocked slots in slot manager (the corresponding TMs cannot be > >>> released/unregistered because there are still running tasks on them). > >>> Therefore, we need to filter out the blocked slots when trying to > fulfill > >>> the slot requirements. Therefore it also needs to know the blocklist > information. > >>> A better way may be to abstract a resource allocation strategy, and > make > >>> the blocklist as a special implementation, then pass the resource > >>> allocation strategy in when constructing the slot manager. > Unfortunately, > >>> the data structures in the two existing slot manager > >>> implementations(*DeclarativeSlotManager* and *FineGrainedSlotManager*) > are > >>> quite different, it is not easy to abstract a common resource > allocation > >>> strategy, so we prefer to keep the current way(i.e. pass the blocklist > >>> information directly into slot manager). > >>> > >>> > >>> 5. Regarding the BlocklistTracker. I also agree with you, the > BlocklistTracker > >>> does not need to be aware of the executor, and the timeout actions can > be > >>> done outside. > >>> > >>> Chesnay Schepler <ches...@apache.org> 于2022年5月20日周五 17:34写道: > >>> > >>> I have a number of concerns: > >>> > >>> Is the id used for deleting an item the same sent in the initial > request > >>> (and not one returned by Flink)? > >>> I'm very concerned that the tm/node id can contain special characters. > >>> > >>> The GET query should return a map, not a list of items. This makes it > >>> easier to work with. > >>> > >>> The duality of endTimestamp and duration is also concerning. > >>> If we conclude that endTimestamps can in fact work (and aren't utterly > >>> unusable due to timezones), > >>> then this should be able to cover everything and rid us of some > >>> complexity w.r.t. POSTs to the same ID. > >>> Additions would be a PUT, changes a PATCH, deletes a DELETE. > >>> > >>> > >>> I also dislike how we're pushing more functionality into the > >>> slotpool/-manager. > >>> These components are complex enough as-is, and instead I'd propose a > >>> separate component that interacts with the SlotPool/-Manager instead, > >>> for example by removing the slots from that TM. > >>> The reason being that from the slot-pool perspective it is irrelevant > >>> whether a slot is gone because the TM was lost, or because it was > blocked. > >>> > >>> > >>> The BlocklistTracker shouldn't be modeled as component that is aware of > >>> the concept of main threads. > >>> It really has no business knowing that; all it needs is an executor for > >>> handling timeouts/periodic actions, > >>> and a way to interact with the JM/RM (which internally can handle the > >>> scheduling into the main thread). > >>> > >>> > >>> On 20/05/2022 07:20, Lijie Wang wrote: > >>> > >>> Hi everyone, > >>> > >>> I have started a vote for this FLIP [1]. Please cast your vote there or > >>> > >>> ask > >>> > >>> additional questions here. [1] > >>> https://lists.apache.org/thread/3416vks1j35co9608gkmsoplvcjjz7bg > >>> > >>> Best, Lijie > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月19日周四 17:34写道: > >>> > >>> Hi Konstantin, > >>> > >>> We found that Flink REST URL does not support the format ":merge" , > >>> > >>> which > >>> > >>> will be recognized as a parameter in the URL(due to start with a > >>> > >>> colon). > >>> > >>> We will keep the previous way, i.e. > >>> > >>> POST: http://{jm_rest_address:port}/blocklist/taskmanagers > >>> and the "id" and "merge" flag are put into the request body. > >>> > >>> Best, > >>> Lijie > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月18日周三 09:35写道: > >>> > >>> Hi Weihua, > >>> thanks for feedback. > >>> > >>> 1. Yes, only *Manually* is supported in this FLIP, but it's the first > >>> step towards auto-detection. > >>> 2. We wii print the blocked nodes in logs. Maybe also put it into the > >>> exception of insufficient resources. > >>> 3. No. This FLIP won't change the WebUI. The blocklist information > >>> > >>> can be > >>> > >>> obtained through REST API and metrics. > >>> > >>> Best, > >>> Lijie > >>> > >>> Weihua Hu <huweihua....@gmail.com> 于2022年5月17日周二 21:41写道: > >>> > >>> Hi, > >>> Thanks for creating this FLIP. > >>> We have implemented an automatic blocklist detection mechanism > >>> internally, which is indeed very effective for handling node > >>> > >>> failures. > >>> > >>> Due to the large number of nodes, although SREs already support > >>> automatic offline failure nodes, the detection is not 100% accurate > >>> > >>> and > >>> > >>> there is some delay. > >>> So the blocklist mechanism can make flink job recover from failure > >>> > >>> much > >>> > >>> faster. > >>> > >>> Here are some of my thoughts: > >>> 1. In this FLIP, it needs users to locate machine failure manually, > >>> there is a certain cost of use > >>> 2. What happens if too many nodes are blocked, resulting in > >>> > >>> insufficient > >>> > >>> resources? Will there be a special Exception for the user? > >>> 3. Will we display the blocklist information in the WebUI? The > >>> > >>> blocklist > >>> > >>> is for cluster level, and if multiple users share a cluster, some > >>> > >>> users may > >>> > >>> be a little confused when resources are not enough, or when > >>> > >>> resources are > >>> > >>> applied for more. > >>> > >>> Also, Looking forward to the next FLIP on auto-detection. > >>> > >>> Best, > >>> Weihua > >>> > >>> 2022年5月16日 下午11:22,Lijie Wang <wangdachui9...@gmail.com> 写道: > >>> > >>> Hi Konstantin, > >>> > >>> Maybe change it to the following: > >>> > >>> 1. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id} > >>> Merge is not allowed. If the {id} already exists, return error. > >>> > >>> Otherwise, > >>> > >>> create a new item. > >>> > >>> 2. POST: http:// > >>> > >>> {jm_rest_address:port}/blocklist/taskmanagers/{id}:merge > >>> > >>> Merge is allowed. If the {id} already exists, merge. Otherwise, > >>> > >>> create > >>> > >>> a > >>> > >>> new item. > >>> > >>> WDYT? > >>> > >>> Best, > >>> Lijie > >>> > >>> Konstantin Knauf <kna...@apache.org> 于2022年5月16日周一 20:07写道: > >>> > >>> Hi Lijie, > >>> > >>> hm, maybe the following is more appropriate in that case > >>> > >>> POST: http:// > >>> > >>> {jm_rest_address:port}/blocklist/taskmanagers/{id}:merge > >>> > >>> Best, > >>> > >>> Konstantin > >>> > >>> Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang < > >>> wangdachui9...@gmail.com>: > >>> > >>> Hi Konstantin, > >>> thanks for your feedback. > >>> > >>> From what I understand, PUT should be idempotent. However, we > >>> > >>> have a > >>> > >>> *timeout* field in the request. This means that initiating the > >>> > >>> same > >>> > >>> request > >>> > >>> at two different times will lead to different resource status > >>> > >>> (timestamps > >>> > >>> of the items to be removed will be different). > >>> > >>> Should we use PUT in this case? WDYT? > >>> > >>> Best, > >>> Lijie > >>> > >>> Konstantin Knauf <kna...@apache.org> 于2022年5月13日周五 17:20写道: > >>> > >>> Hi Lijie, > >>> > >>> wouldn't the REST API-idiomatic way for an update/replace be a > >>> > >>> PUT > >>> > >>> on > >>> > >>> the > >>> > >>> resource? > >>> > >>> PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id} > >>> > >>> Best, > >>> > >>> Konstantin > >>> > >>> > >>> > >>> Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang < > >>> wangdachui9...@gmail.com>: > >>> > >>> Hi everyone, > >>> > >>> I've had an offline discussion with Becket Qin and Zhu Zhu, and > >>> > >>> made > >>> > >>> the > >>> > >>> following changes on REST API: > >>> 1. To avoid ambiguity, *timeout* and *endTimestamp* can only > >>> > >>> choose > >>> > >>> one. > >>> > >>> If > >>> > >>> both are specified, will return error. > >>> 2. If the specified item is already there, the *ADD* operation > >>> > >>> has > >>> > >>> two > >>> > >>> behaviors: *return error*(default value) or *merge/update*, > >>> > >>> and we > >>> > >>> add a > >>> > >>> flag to the request body to control it. You can find more > >>> > >>> details > >>> > >>> "Public > >>> > >>> Interface" section. > >>> > >>> If there is no more feedback, we will start the vote thread next > >>> > >>> week. > >>> > >>> Best, > >>> Lijie > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月10日周二 17:14写道: > >>> > >>> Hi Becket Qin, > >>> > >>> Thanks for your suggestions. I have moved the description of > >>> configurations, metrics and REST API into "Public Interface" > >>> > >>> section, > >>> > >>> and > >>> > >>> made a few updates according to your suggestion. And in this > >>> > >>> FLIP, > >>> > >>> there > >>> > >>> no public java Interfaces or pluggables that users need to > >>> > >>> implement > >>> > >>> by > >>> > >>> themselves. > >>> > >>> Answers for you questions: > >>> 1. Yes, there 2 block actions: MARK_BLOCKED and. > >>> MARK_BLOCKED_AND_EVACUATE_TASKS (has renamed). Currently, block > >>> > >>> items > >>> > >>> can > >>> > >>> only be added through the REST API, so these 2 action are > >>> > >>> mentioned > >>> > >>> in > >>> > >>> the > >>> > >>> REST API part (The REST API part has beed moved to public > >>> > >>> interface > >>> > >>> now). > >>> > >>> 2. I agree with you. I have changed the "Cause" field to > >>> > >>> String, > >>> > >>> and > >>> > >>> allow > >>> > >>> users to specify it via REST API. > >>> 3. Yes, it is useful to allow different timeouts. As mentioned > >>> > >>> above, > >>> > >>> we > >>> > >>> will introduce 2 fields : *timeout* and *endTimestamp* into the > >>> > >>> ADD > >>> > >>> REST > >>> > >>> API to specify when to remove the blocked item. These 2 fields > >>> > >>> are > >>> > >>> optional, if neither is specified, it means that the blocked > >>> > >>> item > >>> > >>> is > >>> > >>> permanent and will not be removed. If both are specified, the > >>> > >>> minimum > >>> > >>> of > >>> > >>> *currentTimestamp+tiemout *and* endTimestamp* will be used as > >>> > >>> the > >>> > >>> time > >>> > >>> to > >>> > >>> remove the blocked item. To keep the configurations more > >>> > >>> minimal, > >>> > >>> we > >>> > >>> have > >>> > >>> removed the *cluster.resource-blocklist.item.timeout* > >>> > >>> configuration > >>> > >>> option. > >>> 4. Yes, the block item will be overridden if the specified item > >>> > >>> already > >>> > >>> exists. The ADD operation is *ADD or UPDATE*. > >>> 5. Yes. On JM/RM side, all the blocklist information is > >>> > >>> maintained > >>> > >>> in > >>> > >>> JMBlocklistHandler/RMBlocklistHandler. The blocklist handler(or > >>> > >>> abstracted > >>> > >>> to other interfaces) will be propagated to different > >>> > >>> components. > >>> > >>> Best, > >>> Lijie > >>> > >>> Becket Qin <becket....@gmail.com> 于2022年5月10日周二 11:26写道: > >>> > >>> Hi Lijie, > >>> > >>> Thanks for updating the FLIP. It looks like the public > >>> > >>> interface > >>> > >>> section > >>> > >>> did not fully reflect all the user sensible behavior and API. > >>> > >>> Can > >>> > >>> you > >>> > >>> put > >>> > >>> everything that users may be aware of there? That would > >>> > >>> include > >>> > >>> the > >>> > >>> REST > >>> > >>> API, metrics, configurations, public java Interfaces or > >>> > >>> pluggables > >>> > >>> that > >>> > >>> users may see or implement by themselves, as well as a brief > >>> > >>> summary > >>> > >>> of > >>> > >>> the > >>> behavior of the public API. > >>> > >>> Besides that, I have a few questions: > >>> > >>> 1. According to the conversation in the discussion thread, it > >>> > >>> looks > >>> > >>> like > >>> > >>> the BlockAction will have "MARK_BLOCKLISTED" and > >>> "MARK_BLOCKLISTED_AND_EVACUATE_TASKS". Is that the case? If > >>> > >>> so, > >>> > >>> can > >>> > >>> you > >>> > >>> add > >>> that to the public interface as well? > >>> > >>> 2. At this point, the "Cause" field in the BlockingItem is a > >>> > >>> Throwable > >>> > >>> and > >>> > >>> is not reflected in the REST API. Should that be included in > >>> > >>> the > >>> > >>> query > >>> > >>> response? And should we change that field to be a String so > >>> > >>> users > >>> > >>> may > >>> > >>> specify the cause via the REST API when they block some nodes > >>> > >>> / > >>> > >>> TMs? > >>> > >>> 3. Would it be useful to allow users to have different > >>> > >>> timeouts > >>> > >>> for > >>> > >>> different blocked items? So while there is a default timeout, > >>> > >>> users > >>> > >>> can > >>> > >>> also override it via the REST API when they block an entity. > >>> > >>> 4. Regarding the ADD operation, if the specified item is > >>> > >>> already > >>> > >>> there, > >>> > >>> will the block item be overridden? For example, if the user > >>> > >>> wants > >>> > >>> to > >>> > >>> extend > >>> the timeout of a blocked item, can they just issue an ADD > >>> > >>> command > >>> > >>> again? > >>> > >>> 5. I am not quite familiar with the details of this, but is > >>> > >>> there > >>> > >>> a > >>> > >>> source > >>> > >>> of truth for the blocked list? I think it might be good to > >>> > >>> have a > >>> > >>> single > >>> > >>> source of truth for the blocked list and just propagate that > >>> > >>> list > >>> > >>> to > >>> > >>> different components to take the action of actually blocking > >>> > >>> the > >>> > >>> resource. > >>> > >>> Thanks, > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> On Mon, May 9, 2022 at 5:54 PM Lijie Wang < > >>> > >>> wangdachui9...@gmail.com > >>> > >>> wrote: > >>> > >>> Hi everyone, > >>> > >>> Based on the discussion in the mailing list, I updated the > >>> > >>> FLIP > >>> > >>> doc, > >>> > >>> the > >>> > >>> changes include: > >>> 1. Changed the description of the motivation section to more > >>> > >>> clearly > >>> > >>> describe the problem this FLIP is trying to solve. > >>> 2. Only *Manually* is supported. > >>> 3. Adopted some suggestions, such as *endTimestamp*. > >>> > >>> Best, > >>> Lijie > >>> > >>> > >>> Roman Boyko <ro.v.bo...@gmail.com> 于2022年5月7日周六 19:25写道: > >>> > >>> Hi Lijie! > >>> > >>> > >>> > >>> > >>> *a) “Probably storing inside Zookeeper/Configmap might be > >>> > >>> helpfulhere.” > >>> > >>> Can you explain it in detail? I don't fully understand that. > >>> > >>> In > >>> > >>> myopinion, > >>> > >>> non-active and active are the same, and no special treatment > >>> > >>> isrequired.* > >>> > >>> Sorry this was a misunderstanding from my side. I thought we > >>> > >>> were > >>> > >>> talking > >>> > >>> about the HA mode (but not about Active and Standalone > >>> > >>> ResourceManager). > >>> > >>> And the original question was - how to handle the > >>> > >>> blacklisted > >>> > >>> nodes > >>> > >>> list > >>> > >>> at > >>> > >>> the moment of leader change? Should we simply forget about > >>> > >>> them > >>> > >>> or > >>> > >>> try to > >>> > >>> pre-save that list on the remote storage? > >>> > >>> On Sat, 7 May 2022 at 10:51, Yang Wang < > >>> > >>> danrtsey...@gmail.com > >>> > >>> wrote: > >>> > >>> Thanks Lijie and ZhuZhu for the explanation. > >>> > >>> I just overlooked the "MARK_BLOCKLISTED". For tasks level, > >>> > >>> it > >>> > >>> is > >>> > >>> indeed > >>> > >>> some functionalities the external tools(e.g. kubectl taint) > >>> > >>> could > >>> > >>> not > >>> > >>> support. > >>> > >>> > >>> Best, > >>> Yang > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月6日周五 > >>> > >>> 22:18写道: > >>> > >>> Thanks for your feedback, Jiangang and Martijn. > >>> > >>> @Jiangang > >>> > >>> > >>> For auto-detecting, I wonder how to make the strategy > >>> > >>> and > >>> > >>> mark a > >>> > >>> node > >>> > >>> blocked? > >>> > >>> In fact, we currently plan to not support auto-detection > >>> > >>> in > >>> > >>> this > >>> > >>> FLIP. > >>> > >>> The > >>> > >>> part about auto-detection may be continued in a separate > >>> > >>> FLIP > >>> > >>> in > >>> > >>> the > >>> > >>> future. Some guys have the same concerns as you, and the > >>> > >>> correctness > >>> > >>> and > >>> > >>> necessity of auto-detection may require further discussion > >>> > >>> in > >>> > >>> the > >>> > >>> future. > >>> > >>> In session mode, multi jobs can fail on the same bad > >>> > >>> node > >>> > >>> and > >>> > >>> the > >>> > >>> node > >>> > >>> should be marked blocked. > >>> By design, the blocklist information will be shared among > >>> > >>> all > >>> > >>> jobs > >>> > >>> in a > >>> > >>> cluster/session. The JM will sync blocklist information > >>> > >>> with > >>> > >>> RM. > >>> > >>> @Martijn > >>> > >>> I agree with Yang Wang on this. > >>> > >>> As Zhu Zhu and I mentioned above, we think the > >>> > >>> MARK_BLOCKLISTED(Just > >>> > >>> limits > >>> > >>> the load of the node and does not kill all the processes > >>> > >>> on > >>> > >>> it) > >>> > >>> is > >>> > >>> also > >>> > >>> important, and we think that external systems (*yarn > >>> > >>> rmadmin > >>> > >>> or > >>> > >>> kubectl > >>> > >>> taint*) cannot support it. So we think it makes sense even > >>> > >>> only > >>> > >>> *manually*. > >>> > >>> I also agree with Chesnay that magical mechanisms are > >>> > >>> indeed > >>> > >>> super > >>> > >>> hard > >>> > >>> to get right. > >>> Yes, as you see, Jiangang(and a few others) have the same > >>> > >>> concern. > >>> > >>> However, we currently plan to not support auto-detection > >>> > >>> in > >>> > >>> this > >>> > >>> FLIP, > >>> > >>> and > >>> > >>> only *manually*. In addition, I'd like to say that the > >>> > >>> FLIP > >>> > >>> provides > >>> > >>> a > >>> > >>> mechanism to support MARK_BLOCKLISTED and > >>> MARK_BLOCKLISTED_AND_EVACUATE_TASKS, > >>> the auto-detection may be done by external systems. > >>> > >>> Best, > >>> Lijie > >>> > >>> Martijn Visser <mart...@ververica.com> 于2022年5月6日周五 > >>> > >>> 19:04写道: > >>> > >>> If we only support to block nodes manually, then I > >>> > >>> could > >>> > >>> not > >>> > >>> see > >>> > >>> the obvious advantages compared with current SRE's > >>> > >>> approach(via > >>> > >>> *yarn > >>> > >>> rmadmin or kubectl taint*). > >>> > >>> I agree with Yang Wang on this. > >>> > >>> To me this sounds yet again like one of those magical > >>> > >>> mechanisms > >>> > >>> that > >>> > >>> will rarely work just right. > >>> > >>> I also agree with Chesnay that magical mechanisms are > >>> > >>> indeed > >>> > >>> super > >>> > >>> hard > >>> > >>> to > >>> > >>> get right. > >>> > >>> Best regards, > >>> > >>> Martijn > >>> > >>> On Fri, 6 May 2022 at 12:03, Jiangang Liu < > >>> > >>> liujiangangp...@gmail.com > >>> > >>> wrote: > >>> > >>> Thanks for the valuable design. The auto-detecting can > >>> > >>> decrease > >>> > >>> great > >>> > >>> work > >>> > >>> for us. We have implemented the similar feature in our > >>> > >>> inner > >>> > >>> flink > >>> > >>> version. > >>> Below is something that I care about: > >>> > >>> 1. For auto-detecting, I wonder how to make the > >>> > >>> strategy > >>> > >>> and > >>> > >>> mark a > >>> > >>> node > >>> blocked? Sometimes the blocked node is hard to be > >>> > >>> detected, > >>> > >>> for > >>> > >>> example, > >>> the upper node or the down node will be blocked when > >>> > >>> network > >>> > >>> unreachable. > >>> 2. I see that the strategy is made in JobMaster > >>> > >>> side. > >>> > >>> How > >>> > >>> about > >>> > >>> implementing the similar logic in resource manager? > >>> > >>> In > >>> > >>> session > >>> > >>> mode, > >>> > >>> multi > >>> jobs can fail on the same bad node and the node > >>> > >>> should > >>> > >>> be > >>> > >>> marked > >>> > >>> blocked. > >>> If the job makes the strategy, the node may be not > >>> > >>> marked > >>> > >>> blocked > >>> > >>> if > >>> > >>> the > >>> fail times don't exceed the threshold. > >>> > >>> > >>> Zhu Zhu <reed...@gmail.com> 于2022年5月5日周四 23:35写道: > >>> > >>> Thank you for all your feedback! > >>> > >>> Besides the answers from Lijie, I'd like to share > >>> > >>> some > >>> > >>> of > >>> > >>> my > >>> > >>> thoughts: > >>> > >>> 1. Whether to enable automatical blocklist > >>> Generally speaking, it is not a goal of FLIP-224. > >>> The automatical way should be something built upon > >>> > >>> the > >>> > >>> blocklist > >>> > >>> mechanism and well decoupled. It was designed to be a > >>> > >>> configurable > >>> > >>> blocklist strategy, but I think we can further > >>> > >>> decouple > >>> > >>> it > >>> > >>> by > >>> > >>> introducing a abnormal node detector, as Becket > >>> > >>> suggested, > >>> > >>> which > >>> > >>> just > >>> > >>> uses the blocklist mechanism once bad nodes are > >>> > >>> detected. > >>> > >>> However, > >>> > >>> it > >>> > >>> should be a separate FLIP with further dev > >>> > >>> discussions > >>> > >>> and > >>> > >>> feedback > >>> > >>> from users. I also agree with Becket that different > >>> > >>> users > >>> > >>> have > >>> > >>> different > >>> > >>> requirements, and we should listen to them. > >>> > >>> 2. Is it enough to just take away abnormal nodes > >>> > >>> externally > >>> > >>> My answer is no. As Lijie has mentioned, we need a > >>> > >>> way > >>> > >>> to > >>> > >>> avoid > >>> > >>> deploying tasks to temporary hot nodes. In this case, > >>> > >>> users > >>> > >>> may > >>> > >>> just > >>> > >>> want to limit the load of the node and do not want to > >>> > >>> kill > >>> > >>> all > >>> > >>> the > >>> > >>> processes on it. Another case is the speculative > >>> > >>> execution[1] > >>> > >>> which > >>> > >>> may also leverage this feature to avoid starting > >>> > >>> mirror > >>> > >>> tasks on > >>> > >>> slow > >>> > >>> nodes. > >>> > >>> Thanks, > >>> Zhu > >>> > >>> [1] > >>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job > >>> > >>> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月5日周四 > >>> > >>> 15:56写道: > >>> > >>> Hi everyone, > >>> > >>> > >>> Thanks for your feedback. > >>> > >>> > >>> There's one detail that I'd like to re-emphasize > >>> > >>> here > >>> > >>> because > >>> > >>> it > >>> > >>> can > >>> > >>> affect the value and design of the blocklist > >>> > >>> mechanism > >>> > >>> (perhaps > >>> > >>> I > >>> > >>> should > >>> > >>> highlight it in the FLIP). We propose two actions in > >>> > >>> FLIP: > >>> > >>> 1) MARK_BLOCKLISTED: Just mark the task manager or > >>> > >>> node > >>> > >>> as > >>> > >>> blocked. > >>> > >>> Future slots should not be allocated from the blocked > >>> > >>> task > >>> > >>> manager > >>> > >>> or > >>> > >>> node. > >>> > >>> But slots that are already allocated will not be > >>> > >>> affected. > >>> > >>> A > >>> > >>> typical > >>> > >>> application scenario is to mitigate machine hotspots. > >>> > >>> In > >>> > >>> this > >>> > >>> case, > >>> > >>> we > >>> > >>> hope > >>> > >>> that subsequent resource allocations will not be on > >>> > >>> the > >>> > >>> hot > >>> > >>> machine, > >>> > >>> but > >>> > >>> tasks currently running on it should not be affected. > >>> > >>> 2) MARK_BLOCKLISTED_AND_EVACUATE_TASKS: Mark the > >>> > >>> task > >>> > >>> manager > >>> > >>> or > >>> > >>> node > >>> > >>> as > >>> > >>> blocked, and evacuate all tasks on it. Evacuated > >>> > >>> tasks > >>> > >>> will > >>> > >>> be > >>> > >>> restarted on > >>> > >>> non-blocked task managers. > >>> > >>> For the above 2 actions, the former may more > >>> > >>> highlight > >>> > >>> the > >>> > >>> meaning > >>> > >>> of > >>> > >>> this FLIP, because the external system cannot do > >>> > >>> that. > >>> > >>> Regarding *Manually* and *Automatically*, I > >>> > >>> basically > >>> > >>> agree > >>> > >>> with > >>> > >>> @Becket > >>> > >>> Qin: different users have different answers. Not all > >>> > >>> users’ > >>> > >>> deployment > >>> > >>> environments have a special external system that can > >>> > >>> perform > >>> > >>> the > >>> > >>> anomaly > >>> > >>> detection. In addition, adding pluggable/optional > >>> > >>> auto-detection > >>> > >>> doesn't > >>> > >>> require much extra work on top of manual > >>> > >>> specification. > >>> > >>> I will answer your other questions one by one. > >>> > >>> > >>> @Yangze > >>> > >>> a) I think you are right, we do not need to expose > >>> > >>> the > >>> `cluster.resource-blocklist.item.timeout-check-interval` > >>> > >>> to > >>> > >>> users. > >>> > >>> b) We can abstract the `notifyException` to a > >>> > >>> separate > >>> > >>> interface > >>> > >>> (maybe > >>> > >>> BlocklistExceptionListener), and the > >>> > >>> ResourceManagerBlocklistHandler > >>> > >>> can > >>> > >>> implement it in the future. > >>> > >>> @Martijn > >>> > >>> a) I also think the manual blocking should be done > >>> > >>> by > >>> > >>> cluster > >>> > >>> operators. > >>> > >>> b) I think manual blocking makes sense, because > >>> > >>> according > >>> > >>> to > >>> > >>> my > >>> > >>> experience, users are often the first to perceive the > >>> > >>> machine > >>> > >>> problems > >>> > >>> (because of job failover or delay), and they will > >>> > >>> contact > >>> > >>> cluster > >>> > >>> operators > >>> > >>> to solve it, or even tell the cluster operators which > >>> > >>> machine is > >>> > >>> problematic. From this point of view, I think the > >>> > >>> people > >>> > >>> who > >>> > >>> really > >>> > >>> need > >>> > >>> the manual blocking are the users, and it’s just > >>> > >>> performed > >>> > >>> by > >>> > >>> the > >>> > >>> cluster > >>> > >>> operator, so I think the manual blocking makes sense. > >>> > >>> @Chesnay > >>> > >>> We need to touch the logic of JM/SlotPool, because > >>> > >>> for > >>> > >>> MARK_BLOCKLISTED > >>> > >>> , we need to know whether the slot is blocklisted > >>> > >>> when > >>> > >>> the > >>> > >>> task > >>> > >>> is > >>> > >>> FINISHED/CANCELLED/FAILED. If so, SlotPool should > >>> > >>> release > >>> > >>> the > >>> > >>> slot > >>> > >>> directly to avoid assigning other tasks (of this job) > >>> > >>> on > >>> > >>> it. > >>> > >>> If > >>> > >>> we > >>> > >>> only > >>> > >>> maintain the blocklist information on the RM, JM > >>> > >>> needs > >>> > >>> to > >>> > >>> retrieve > >>> > >>> it > >>> > >>> by > >>> > >>> RPC. I think the performance overhead of that is > >>> > >>> relatively > >>> > >>> large, > >>> > >>> so > >>> > >>> I > >>> > >>> think it's worth maintaining the blocklist > >>> > >>> information > >>> > >>> on > >>> > >>> the JM > >>> > >>> side > >>> > >>> and > >>> > >>> syncing them. > >>> > >>> @Роман > >>> > >>> a) “Probably storing inside Zookeeper/Configmap > >>> > >>> might > >>> > >>> be > >>> > >>> helpful > >>> > >>> here.” Can you explain it in detail? I don't fully > >>> > >>> understand > >>> > >>> that. > >>> > >>> In > >>> > >>> my > >>> > >>> opinion, non-active and active are the same, and no > >>> > >>> special > >>> > >>> treatment > >>> > >>> is > >>> > >>> required. > >>> > >>> b) I agree with you, the `endTimestamp` makes > >>> > >>> sense, > >>> > >>> I > >>> > >>> will > >>> > >>> add > >>> > >>> it > >>> > >>> to > >>> > >>> FLIP. > >>> > >>> @Yang > >>> > >>> As mentioned above, AFAK, the external system > >>> > >>> cannot > >>> > >>> support > >>> > >>> the > >>> > >>> MARK_BLOCKLISTED action. > >>> > >>> Looking forward to your further feedback. > >>> > >>> > >>> Best, > >>> > >>> Lijie > >>> > >>> > >>> Yang Wang <danrtsey...@gmail.com> 于2022年5月3日周二 > >>> > >>> 21:09写道: > >>> > >>> Thanks Lijie and Zhu for creating the proposal. > >>> > >>> I want to share some thoughts about Flink cluster > >>> > >>> operations. > >>> > >>> In the production environment, the SRE(aka Site > >>> > >>> Reliability > >>> > >>> Engineer) > >>> > >>> already has many tools to detect the unstable > >>> > >>> nodes, > >>> > >>> which > >>> > >>> could > >>> > >>> take > >>> > >>> the > >>> > >>> system logs/metrics into consideration. > >>> Then they use graceful-decomission in YARN and > >>> > >>> taint > >>> > >>> in > >>> > >>> K8s > >>> > >>> to > >>> > >>> prevent > >>> > >>> new > >>> > >>> allocations on these unstable nodes. > >>> At last, they will evict all the containers and > >>> > >>> pods > >>> > >>> running > >>> > >>> on > >>> > >>> these > >>> > >>> nodes. > >>> > >>> This mechanism also works for planned maintenance. > >>> > >>> So > >>> > >>> I > >>> > >>> am > >>> > >>> afraid > >>> > >>> this > >>> > >>> is > >>> > >>> not the typical use case for FLIP-224. > >>> > >>> If we only support to block nodes manually, then I > >>> > >>> could > >>> > >>> not > >>> > >>> see > >>> > >>> the obvious advantages compared with current SRE's > >>> > >>> approach(via > >>> > >>> *yarn > >>> > >>> rmadmin or kubectl taint*). > >>> At least, we need to have a pluggable component > >>> > >>> which > >>> > >>> could > >>> > >>> expose > >>> > >>> the > >>> > >>> potential unstable nodes automatically and block > >>> > >>> them > >>> > >>> if > >>> > >>> enabled > >>> > >>> explicitly. > >>> > >>> Best, > >>> Yang > >>> > >>> > >>> > >>> Becket Qin <becket....@gmail.com> 于2022年5月2日周一 > >>> > >>> 16:36写道: > >>> > >>> Thanks for the proposal, Lijie. > >>> > >>> This is an interesting feature and discussion, > >>> > >>> and > >>> > >>> somewhat > >>> > >>> related > >>> > >>> to the > >>> > >>> design principle about how people should operate > >>> > >>> Flink. > >>> > >>> I think there are three things involved in this > >>> > >>> FLIP. > >>> > >>> a) Detect and report the unstable node. > >>> b) Collect the information of the unstable > >>> > >>> node > >>> > >>> and > >>> > >>> form a > >>> > >>> blocklist. > >>> > >>> c) Take the action to block nodes. > >>> > >>> My two cents: > >>> > >>> 1. It looks like people all agree that Flink > >>> > >>> should > >>> > >>> have > >>> > >>> c). > >>> > >>> It > >>> > >>> is > >>> > >>> not only > >>> > >>> useful for cases of node failures, but also > >>> > >>> handy > >>> > >>> for > >>> > >>> some > >>> > >>> planned > >>> > >>> maintenance. > >>> > >>> 2. People have different opinions on b), i.e. > >>> > >>> who > >>> > >>> should be > >>> > >>> the > >>> > >>> brain > >>> > >>> to > >>> > >>> make the decision to block a node. I think this > >>> > >>> largely > >>> > >>> depends > >>> > >>> on > >>> > >>> who we > >>> > >>> talk to. Different users would probably give > >>> > >>> different > >>> > >>> answers. > >>> > >>> For > >>> > >>> people > >>> > >>> who do have a centralized node health management > >>> > >>> service, > >>> > >>> let > >>> > >>> Flink > >>> > >>> do just > >>> > >>> do a) and c) would be preferred. So essentially > >>> > >>> Flink > >>> > >>> would > >>> > >>> be > >>> > >>> one > >>> > >>> of > >>> > >>> the > >>> > >>> sources that may detect unstable nodes, report > >>> > >>> it > >>> > >>> to > >>> > >>> that > >>> > >>> service, > >>> > >>> and then > >>> > >>> take the command from that service to block the > >>> > >>> problematic > >>> > >>> nodes. > >>> > >>> On > >>> > >>> the > >>> > >>> other hand, for users who do not have such a > >>> > >>> service, > >>> > >>> simply > >>> > >>> letting > >>> > >>> Flink > >>> > >>> be clever by itself to block the suspicious > >>> > >>> nodes > >>> > >>> might > >>> > >>> be > >>> > >>> desired > >>> > >>> to > >>> > >>> ensure the jobs are running smoothly. > >>> > >>> So that indicates a) and b) here should be > >>> > >>> pluggable / > >>> > >>> optional. > >>> > >>> In light of this, maybe it would make sense to > >>> > >>> have > >>> > >>> something > >>> > >>> pluggable > >>> > >>> like a UnstableNodeReporter which exposes > >>> > >>> unstable > >>> > >>> nodes > >>> > >>> actively. > >>> > >>> (A > >>> > >>> more > >>> > >>> general interface should be JobInfoReporter<T> > >>> > >>> which > >>> > >>> can be > >>> > >>> used > >>> > >>> to > >>> > >>> report > >>> > >>> any information of type <T>. But I'll just keep > >>> > >>> the > >>> > >>> scope > >>> > >>> relevant > >>> > >>> to > >>> > >>> this > >>> > >>> FLIP here). Personally speaking, I think it is > >>> > >>> OK > >>> > >>> to > >>> > >>> have a > >>> > >>> default > >>> > >>> implementation of a reporter which just tells > >>> > >>> Flink > >>> > >>> to > >>> > >>> take > >>> > >>> action > >>> > >>> to > >>> > >>> block > >>> > >>> problematic nodes and also unblocks them after > >>> > >>> timeout. > >>> > >>> Thanks, > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> > >>> On Mon, May 2, 2022 at 3:27 PM Роман Бойко < > >>> > >>> ro.v.bo...@gmail.com > >>> > >>> wrote: > >>> > >>> Thanks for good initiative, Lijie and Zhu! > >>> > >>> If it's possible I'd like to participate in > >>> > >>> development. > >>> > >>> I agree with 3rd point of Konstantin's reply - > >>> > >>> we > >>> > >>> should > >>> > >>> consider > >>> > >>> to move > >>> > >>> somehow the information of blocklisted > >>> > >>> nodes/TMs > >>> > >>> from > >>> > >>> active > >>> > >>> ResourceManager to non-active ones. Probably > >>> > >>> storing > >>> > >>> inside > >>> > >>> Zookeeper/Configmap might be helpful here. > >>> > >>> And I agree with Martijn that a lot of > >>> > >>> organizations > >>> > >>> don't > >>> > >>> want > >>> > >>> to > >>> > >>> expose > >>> > >>> such API for a cluster user group. But I think > >>> > >>> it's > >>> > >>> necessary > >>> > >>> to > >>> > >>> have the > >>> > >>> mechanism for unblocking the nodes/TMs anyway > >>> > >>> for > >>> > >>> avoiding > >>> > >>> incorrect > >>> > >>> automatic behaviour. > >>> > >>> And another one small suggestion - I think it > >>> > >>> would > >>> > >>> be > >>> > >>> better > >>> > >>> to > >>> > >>> extend > >>> > >>> the > >>> > >>> *BlocklistedItem* class with the > >>> > >>> *endTimestamp* > >>> > >>> field > >>> > >>> and > >>> > >>> fill > >>> > >>> it > >>> > >>> at the > >>> > >>> item creation. This simple addition will allow > >>> > >>> to: > >>> > >>> - > >>> > >>> Provide the ability to users to setup the > >>> > >>> exact > >>> > >>> time > >>> > >>> of > >>> > >>> blocklist end > >>> > >>> through RestAPI > >>> - > >>> > >>> Not being tied to a single value of > >>> *cluster.resource-blacklist.item.timeout* > >>> > >>> > >>> On Mon, 2 May 2022 at 14:17, Chesnay Schepler > >>> > >>> < > >>> > >>> ches...@apache.org> > >>> > >>> wrote: > >>> > >>> I do share the concern between blurring the > >>> > >>> lines > >>> > >>> a > >>> > >>> bit. > >>> > >>> That said, I'd prefer to not have any > >>> > >>> auto-detection > >>> > >>> and > >>> > >>> only > >>> > >>> have an > >>> > >>> opt-in mechanism > >>> to manually block processes/nodes. To me > >>> > >>> this > >>> > >>> sounds > >>> > >>> yet > >>> > >>> again > >>> > >>> like one > >>> > >>> of those > >>> magical mechanisms that will rarely work > >>> > >>> just > >>> > >>> right. > >>> > >>> An external system can leverage way more > >>> > >>> information > >>> > >>> after > >>> > >>> all. > >>> > >>> Moreover, I'm quite concerned about the > >>> > >>> complexity > >>> > >>> of > >>> > >>> this > >>> > >>> proposal. > >>> > >>> Tracking on both the RM/JM side; syncing > >>> > >>> between > >>> > >>> components; > >>> > >>> adjustments > >>> > >>> to the > >>> slot and resource protocol. > >>> > >>> In a way it seems overly complicated. > >>> > >>> If we look at it purely from an active > >>> > >>> resource > >>> > >>> management > >>> > >>> perspective, > >>> > >>> then there > >>> isn't really a need to touch the slot > >>> > >>> protocol > >>> > >>> at > >>> > >>> all > >>> > >>> (or > >>> > >>> in > >>> > >>> fact > >>> > >>> to > >>> > >>> anything in the JobMaster), > >>> because there isn't any point in keeping > >>> > >>> around > >>> > >>> blocked > >>> > >>> TMs > >>> > >>> in > >>> > >>> the > >>> > >>> first > >>> > >>> place. > >>> They'd just be idling, potentially shutting > >>> > >>> down > >>> > >>> after > >>> > >>> a > >>> > >>> while > >>> > >>> by > >>> > >>> the > >>> > >>> RM > >>> > >>> because of > >>> it (unless we _also_ touch that logic). > >>> Here the blocking of a process (be it by > >>> > >>> blocking > >>> > >>> the > >>> > >>> process > >>> > >>> or > >>> > >>> node) > >>> > >>> is > >>> > >>> equivalent with shutting down the blocked > >>> > >>> process(es). > >>> > >>> Once the block is lifted we can just spin it > >>> > >>> back > >>> > >>> up. > >>> > >>> And I do wonder whether we couldn't apply > >>> > >>> the > >>> > >>> same > >>> > >>> line > >>> > >>> of > >>> > >>> thinking to > >>> > >>> standalone resource management. > >>> Here being able to stop/restart a > >>> > >>> process/node > >>> > >>> manually > >>> > >>> should > >>> > >>> be > >>> > >>> a > >>> > >>> core > >>> > >>> requirement for a Flink deployment anyway. > >>> > >>> > >>> On 02/05/2022 08:49, Martijn Visser wrote: > >>> > >>> Hi everyone, > >>> > >>> Thanks for creating this FLIP. I can > >>> > >>> understand > >>> > >>> the > >>> > >>> problem > >>> > >>> and > >>> > >>> I see > >>> > >>> value > >>> > >>> in the automatic detection and > >>> > >>> blocklisting. I > >>> > >>> do > >>> > >>> have > >>> > >>> some > >>> > >>> concerns > >>> > >>> with > >>> > >>> the ability to manually specify to be > >>> > >>> blocked > >>> > >>> resources. > >>> > >>> I > >>> > >>> have > >>> > >>> two > >>> > >>> concerns; > >>> > >>> * Most organizations explicitly have a > >>> > >>> separation > >>> > >>> of > >>> > >>> concerns, > >>> > >>> meaning > >>> > >>> that > >>> > >>> there's a group who's responsible for > >>> > >>> managing a > >>> > >>> cluster > >>> > >>> and > >>> > >>> there's > >>> > >>> a > >>> > >>> user > >>> > >>> group who uses that cluster. With the > >>> > >>> introduction of > >>> > >>> this > >>> > >>> mechanism, > >>> > >>> the > >>> > >>> latter group now can influence the > >>> > >>> responsibility > >>> > >>> of > >>> > >>> the > >>> > >>> first > >>> > >>> group. > >>> > >>> So > >>> > >>> it > >>> > >>> can be possible that someone from the user > >>> > >>> group > >>> > >>> blocks > >>> > >>> something, > >>> > >>> which > >>> > >>> causes an outage (which could result in > >>> > >>> paging > >>> > >>> mechanism > >>> > >>> triggering > >>> > >>> etc) > >>> > >>> which impacts the first group. > >>> * How big is the group of people who can > >>> > >>> go > >>> > >>> through > >>> > >>> the > >>> > >>> process > >>> > >>> of > >>> > >>> manually > >>> > >>> identifying a node that isn't behaving as > >>> > >>> it > >>> > >>> should > >>> > >>> be? I > >>> > >>> do > >>> > >>> think > >>> > >>> this > >>> > >>> group is relatively limited. Does it then > >>> > >>> make > >>> > >>> sense > >>> > >>> to > >>> > >>> introduce > >>> > >>> such > >>> > >>> a > >>> > >>> feature, which would only be used by a > >>> > >>> really > >>> > >>> small > >>> > >>> user > >>> > >>> group > >>> > >>> of > >>> > >>> Flink? > >>> > >>> We > >>> > >>> still have to maintain, test and support > >>> > >>> such > >>> > >>> a > >>> > >>> feature. > >>> > >>> I'm +1 for the autodetection features, but > >>> > >>> I'm > >>> > >>> leaning > >>> > >>> towards > >>> > >>> not > >>> > >>> exposing > >>> > >>> this to the user group but having this > >>> > >>> available > >>> > >>> strictly > >>> > >>> for > >>> > >>> cluster > >>> > >>> operators. They could then also set up > >>> > >>> their > >>> > >>> paging/metrics/logging > >>> > >>> system > >>> > >>> to take this into account. > >>> > >>> Best regards, > >>> > >>> Martijn Visser > >>> https://twitter.com/MartijnVisser82 > >>> https://github.com/MartijnVisser > >>> > >>> > >>> On Fri, 29 Apr 2022 at 09:39, Yangze Guo < > >>> > >>> karma...@gmail.com > >>> > >>> wrote: > >>> > >>> Thanks for driving this, Zhu and Lijie. > >>> > >>> +1 for the overall proposal. Just share > >>> > >>> some > >>> > >>> cents > >>> > >>> here: > >>> > >>> - Why do we need to expose > >>> > >>> cluster.resource-blacklist.item.timeout-check-interval > >>> > >>> to > >>> > >>> the > >>> > >>> user? > >>> > >>> I think the semantics of > >>> > >>> `cluster.resource-blacklist.item.timeout` > >>> > >>> is > >>> > >>> sufficient for the user. How to guarantee > >>> > >>> the > >>> > >>> timeout > >>> > >>> mechanism is > >>> > >>> Flink's internal implementation. I think > >>> > >>> it > >>> > >>> will > >>> > >>> be > >>> > >>> very > >>> > >>> confusing > >>> > >>> and > >>> > >>> we do not need to expose it to users. > >>> > >>> - ResourceManager can notify the > >>> > >>> exception > >>> > >>> of a > >>> > >>> task > >>> > >>> manager to > >>> > >>> `BlacklistHandler` as well. > >>> For example, the slot allocation might > >>> > >>> fail > >>> > >>> in > >>> > >>> case > >>> > >>> the > >>> > >>> target > >>> > >>> task > >>> > >>> manager is busy or has a network jitter. > >>> > >>> I > >>> > >>> don't > >>> > >>> mean > >>> > >>> we > >>> > >>> need > >>> > >>> to > >>> > >>> cover > >>> > >>> this case in this version, but we can > >>> > >>> also > >>> > >>> open a > >>> > >>> `notifyException` > >>> > >>> in > >>> > >>> `ResourceManagerBlacklistHandler`. > >>> > >>> - Before we sync the blocklist to > >>> > >>> ResourceManager, > >>> > >>> will > >>> > >>> the > >>> > >>> slot of > >>> > >>> a > >>> > >>> blocked task manager continues to be > >>> > >>> released > >>> > >>> and > >>> > >>> allocated? > >>> > >>> Best, > >>> Yangze Guo > >>> > >>> On Thu, Apr 28, 2022 at 3:11 PM Lijie > >>> > >>> Wang > >>> > >>> < > >>> > >>> wangdachui9...@gmail.com> > >>> > >>> wrote: > >>> > >>> Hi Konstantin, > >>> > >>> Thanks for your feedback. I will > >>> > >>> response > >>> > >>> your 4 > >>> > >>> remarks: > >>> > >>> 1) Thanks for reminding me of the > >>> > >>> controversy. I > >>> > >>> think > >>> > >>> “BlockList” > >>> > >>> is > >>> > >>> good > >>> > >>> enough, and I will change it in FLIP. > >>> > >>> > >>> 2) Your suggestion for the REST API is a > >>> > >>> good > >>> > >>> idea. > >>> > >>> Based > >>> > >>> on > >>> > >>> the > >>> > >>> above, I > >>> > >>> would change REST API as following: > >>> > >>> POST/GET <host>/blocklist/nodes > >>> > >>> POST/GET <host>/blocklist/taskmanagers > >>> > >>> DELETE > >>> > >>> <host>/blocklist/node/<identifier> > >>> > >>> DELETE > >>> > >>> <host>/blocklist/taskmanager/<identifier> > >>> > >>> 3) If a node is blocking/blocklisted, it > >>> > >>> means > >>> > >>> that > >>> > >>> all > >>> > >>> task > >>> > >>> managers > >>> > >>> on > >>> > >>> this node are blocklisted. All slots on > >>> > >>> these > >>> > >>> TMs > >>> > >>> are > >>> > >>> not > >>> > >>> available. > >>> > >>> This > >>> > >>> is actually a bit like TM losts, but > >>> > >>> these > >>> > >>> TMs > >>> > >>> are > >>> > >>> not > >>> > >>> really > >>> > >>> lost, > >>> > >>> they > >>> > >>> are in an unavailable status, and they > >>> > >>> are > >>> > >>> still > >>> > >>> registered > >>> > >>> in this > >>> > >>> flink > >>> > >>> cluster. They will be available again > >>> > >>> once > >>> > >>> the > >>> > >>> corresponding > >>> > >>> blocklist > >>> > >>> item > >>> > >>> is removed. This behavior is the same in > >>> > >>> active/non-active > >>> > >>> clusters. > >>> > >>> However in the active clusters, these > >>> > >>> TMs > >>> > >>> may > >>> > >>> be > >>> > >>> released > >>> > >>> due > >>> > >>> to > >>> > >>> idle > >>> > >>> timeouts. > >>> > >>> > >>> 4) For the item timeout, I prefer to > >>> > >>> keep > >>> > >>> it. > >>> > >>> The > >>> > >>> reasons > >>> > >>> are > >>> > >>> as > >>> > >>> following: > >>> > >>> a) The timeout will not affect users > >>> > >>> adding > >>> > >>> or > >>> > >>> removing > >>> > >>> items > >>> > >>> via > >>> > >>> REST > >>> > >>> API, > >>> > >>> and users can disable it by configuring > >>> > >>> it > >>> > >>> to > >>> > >>> Long.MAX_VALUE . > >>> > >>> b) Some node problems can recover after > >>> > >>> a > >>> > >>> period of > >>> > >>> time > >>> > >>> (such as > >>> > >>> machine > >>> > >>> hotspots), in which case users may > >>> > >>> prefer > >>> > >>> that > >>> > >>> Flink > >>> > >>> can > >>> > >>> do > >>> > >>> this > >>> > >>> automatically instead of requiring the > >>> > >>> user > >>> > >>> to > >>> > >>> do > >>> > >>> it > >>> > >>> manually. > >>> > >>> Best, > >>> > >>> Lijie > >>> > >>> Konstantin Knauf <kna...@apache.org> > >>> > >>> 于2022年4月27日周三 > >>> > >>> 19:23写道: > >>> > >>> Hi Lijie, > >>> > >>> I think, this makes sense and +1 to > >>> > >>> only > >>> > >>> support > >>> > >>> manually > >>> > >>> blocking > >>> > >>> taskmanagers and nodes. Maybe the > >>> > >>> different > >>> > >>> strategies > >>> > >>> can > >>> > >>> also be > >>> > >>> maintained outside of Apache Flink. > >>> > >>> A few remarks: > >>> > >>> 1) Can we use another term than > >>> > >>> "bla.cklist" > >>> > >>> due > >>> > >>> to > >>> > >>> the > >>> > >>> controversy > >>> > >>> around > >>> > >>> the term? [1] There was also a Jira > >>> > >>> Ticket > >>> > >>> about > >>> > >>> this > >>> > >>> topic a > >>> > >>> while > >>> > >>> back > >>> > >>> and there was generally a consensus to > >>> > >>> avoid > >>> > >>> the > >>> > >>> term > >>> > >>> blacklist & > >>> > >>> whitelist > >>> > >>> [2]? We could use "blocklist" > >>> > >>> "denylist" > >>> > >>> or > >>> > >>> "quarantined" > >>> > >>> 2) For the REST API, I'd prefer a > >>> > >>> slightly > >>> > >>> different > >>> > >>> design > >>> > >>> as > >>> > >>> verbs > >>> > >>> like > >>> > >>> add/remove often considered an > >>> > >>> anti-pattern > >>> > >>> for > >>> > >>> REST > >>> > >>> APIs. > >>> > >>> POST > >>> > >>> on a > >>> > >>> list > >>> > >>> item is generally the standard to add > >>> > >>> items. > >>> > >>> DELETE > >>> > >>> on > >>> > >>> the > >>> > >>> individual > >>> > >>> resource is standard to remove an item. > >>> > >>> POST <host>/quarantine/items > >>> DELETE > >>> > >>> <host>/quarantine/items/<itemidentifier> > >>> > >>> We could also consider to separate > >>> > >>> taskmanagers > >>> > >>> and > >>> > >>> nodes > >>> > >>> in > >>> > >>> the > >>> > >>> REST > >>> > >>> API > >>> > >>> (and internal data structures). Any > >>> > >>> opinion > >>> > >>> on > >>> > >>> this? > >>> > >>> POST/GET <host>/quarantine/nodes > >>> POST/GET <host>/quarantine/taskmanager > >>> DELETE > >>> > >>> <host>/quarantine/nodes/<identifier> > >>> > >>> DELETE > >>> > >>> <host>/quarantine/taskmanager/<identifier> > >>> > >>> 3) How would blocking nodes behave with > >>> > >>> non-active > >>> > >>> resource > >>> > >>> managers, > >>> > >>> i.e. > >>> > >>> standalone or reactive mode? > >>> > >>> 4) To keep the implementation even more > >>> > >>> minimal, > >>> > >>> do > >>> > >>> we > >>> > >>> need > >>> > >>> the > >>> > >>> timeout > >>> > >>> behavior? If items are added/removed > >>> > >>> manually > >>> > >>> we > >>> > >>> could > >>> > >>> delegate > >>> > >>> this > >>> > >>> to the > >>> > >>> user easily. In my opinion the timeout > >>> > >>> behavior > >>> > >>> would > >>> > >>> better > >>> > >>> fit > >>> > >>> into > >>> > >>> specific strategies at a later point. > >>> > >>> Looking forward to your thoughts. > >>> > >>> Cheers and thank you, > >>> > >>> Konstantin > >>> > >>> [1] > >>> > >>> > >>> > https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term > >>> > >>> [2] > >>> > >>> https://issues.apache.org/jira/browse/FLINK-18209 > >>> > >>> Am Mi., 27. Apr. 2022 um 04:04 Uhr > >>> > >>> schrieb > >>> > >>> Lijie > >>> > >>> Wang > >>> > >>> < > >>> > >>> wangdachui9...@gmail.com>: > >>> > >>> Hi all, > >>> > >>> Flink job failures may happen due to > >>> > >>> cluster > >>> > >>> node > >>> > >>> issues > >>> > >>> (insufficient > >>> > >>> disk > >>> > >>> space, bad hardware, network > >>> > >>> abnormalities). > >>> > >>> Flink > >>> > >>> will > >>> > >>> take care > >>> > >>> of > >>> > >>> the > >>> > >>> failures and redeploy the tasks. > >>> > >>> However, > >>> > >>> due > >>> > >>> to > >>> > >>> data > >>> > >>> locality > >>> > >>> and > >>> > >>> limited > >>> > >>> resources, the new tasks are very > >>> > >>> likely > >>> > >>> to > >>> > >>> be > >>> > >>> redeployed > >>> > >>> to the > >>> > >>> same > >>> > >>> nodes, which will result in continuous > >>> > >>> task > >>> > >>> abnormalities > >>> > >>> and > >>> > >>> affect > >>> > >>> job > >>> > >>> progress. > >>> > >>> Currently, Flink users need to > >>> > >>> manually > >>> > >>> identify > >>> > >>> the > >>> > >>> problematic > >>> > >>> node and > >>> > >>> take it offline to solve this problem. > >>> > >>> But > >>> > >>> this > >>> > >>> approach > >>> > >>> has > >>> > >>> following > >>> > >>> disadvantages: > >>> > >>> 1. Taking a node offline can be a > >>> > >>> heavy > >>> > >>> process. > >>> > >>> Users > >>> > >>> may > >>> > >>> need > >>> > >>> to > >>> > >>> contact > >>> > >>> cluster administors to do this. The > >>> > >>> operation > >>> > >>> can > >>> > >>> even > >>> > >>> be > >>> > >>> dangerous > >>> > >>> and > >>> > >>> not > >>> > >>> allowed during some important business > >>> > >>> events. > >>> > >>> 2. Identifying and solving this kind > >>> > >>> of > >>> > >>> problems > >>> > >>> manually > >>> > >>> would > >>> > >>> be > >>> > >>> slow > >>> > >>> and > >>> > >>> a waste of human resources. > >>> > >>> To solve this problem, Zhu Zhu and I > >>> > >>> propose > >>> > >>> to > >>> > >>> introduce a > >>> > >>> blacklist > >>> > >>> mechanism for Flink to filter out > >>> > >>> problematic > >>> > >>> resources. > >>> > >>> You can find more details in > >>> > >>> FLIP-224[1]. > >>> > >>> Looking > >>> > >>> forward > >>> > >>> to your > >>> > >>> feedback. > >>> > >>> [1] > >>> > >>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blacklist+Mechanism > >>> > >>> Best, > >>> > >>> Lijie > >>> > >>> -- > >>> Best regards, > >>> Roman Boyko > >>> e.: ro.v.bo...@gmail.com > >>> > >>> -- > >>> https://twitter.com/snntrable > >>> https://github.com/knaufk > >>> > >>> -- > >>> https://twitter.com/snntrable > >>> https://github.com/knaufk > >>> > >>> > >>> > >