I can see the benefits of control flow. E.g., it might help the old (and inactive) FLIP-17 side input. I would suggest that we add more details of some of the potential use cases.
Here is one mismatch with using control flow for dynamic config. Dynamic config is typically targeted/loaded by one specific operator. Control flow will propagate the dynamic config to all operators. not a problem per se Regarding using the REST api (to jobmanager) for accepting control signals from external system, where are we going to persist/checkpoint the signal? jobmanager can die before the control signal is propagated and checkpointed. Did we lose the control signal in this case? On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <tonysong...@gmail.com> wrote: > +1 on separating the effort into two steps: > > 1. Introduce a common control flow framework, with flexible interfaces > for generating / reacting to control messages for various purposes. > 2. Features that leverating the control flow can be worked on > concurrently > > Meantime, keeping collecting potential features that may leverage the > control flow should be helpful. It provides good inputs for the control > flow framework design, to make the framework common enough to cover the > potential use cases. > > My suggestions on the next steps: > > 1. Allow more time for opinions to be heard and potential use cases to > be collected > 2. Draft a FLIP with the scope of common control flow framework > 3. We probably need a poc implementation to make sure the framework > covers at least the following scenarios > 1. Produce control events from arbitrary operators > 2. Produce control events from JobMaster > 3. Consume control events from arbitrary operators downstream where > the events are produced > > > Thank you~ > > Xintong Song > > > > On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <yungao...@aliyun.com> wrote: > >> Very thanks Jiangang for bringing this up and very thanks for the >> discussion! >> >> I also agree with the summarization by Xintong and Jing that control flow >> seems to be >> a common buidling block for many functionalities and dynamic >> configuration framework >> is a representative application that frequently required by users. >> Regarding the control flow, >> currently we are also considering the design of iteration for the >> flink-ml, and as Xintong has pointed >> out, it also required the control flow in cases like detection global >> termination inside the iteration >> (in this case we need to broadcast an event through the iteration body >> to detect if there are still >> records reside in the iteration body). And regarding whether to >> implement the dynamic configuration >> framework, I also agree with Xintong that the consistency guarantee would >> be a point to consider, we >> might consider if we need to ensure every operator could receive the >> dynamic configuration. >> >> Best, >> Yun >> >> >> >> ------------------------------------------------------------------ >> Sender:kai wang<yiduwang...@gmail.com> >> Date:2021/06/08 11:52:12 >> Recipient:JING ZHANG<beyond1...@gmail.com> >> Cc:刘建刚<liujiangangp...@gmail.com>; Xintong Song [via Apache Flink User >> Mailing List archive.]<ml+s2336050n44245...@n4.nabble.com>; user< >> user@flink.apache.org>; dev<d...@flink.apache.org> >> Theme:Re: Add control mode for flink >> >> >> >> I'm big +1 for this feature. >> >> 1. Limit the input qps. >> 2. Change log level for debug. >> >> in my team, the two examples above are needed >> >> JING ZHANG <beyond1...@gmail.com> 于2021年6月8日周二 上午11:18写道: >> >>> Thanks Jiangang for bringing this up. >>> As mentioned in Jiangang's email, `dynamic configuration framework` >>> provides many useful functions in Kuaishou, because it could update job >>> behavior without relaunching the job. The functions are very popular in >>> Kuaishou, we also see similar demands in maillist [1]. >>> >>> I'm big +1 for this feature. >>> >>> Thanks Xintong and Yun for deep thoughts about the issue. I like the >>> idea about introducing control mode in Flink. >>> It takes the original issue a big step closer to essence which also >>> provides the possibility for more fantastic features as mentioned in >>> Xintong and Jark's response. >>> Based on the idea, there are at least two milestones to achieve the >>> goals which were proposed by Jiangang: >>> (1) Build a common control flow framework in Flink. >>> It focuses on control flow propagation. And, how to integrate the >>> common control flow framework with existing mechanisms. >>> (2) Builds a dynamic configuration framework which is exposed to users >>> directly. >>> We could see dynamic configuration framework is a top application >>> on the underlying control flow framework. >>> It focuses on the Public API which receives configuration updating >>> requests from users. Besides, it is necessary to introduce an API >>> protection mechanism to avoid job performance degradation caused by too >>> many control events. >>> >>> I suggest splitting the whole design into two after we reach a consensus >>> on whether to introduce this feature because these two sub-topic all need >>> careful design. >>> >>> >>> [ >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html >>> ] >>> >>> Best regards, >>> JING ZHANG >>> >>> 刘建刚 <liujiangangp...@gmail.com> 于2021年6月8日周二 上午10:01写道: >>> >>>> Thanks Xintong Song for the detailed supplement. Since flink is >>>> long-running, it is similar to many services. So interacting with it or >>>> controlling it is a common desire. This was our initial thought when >>>> implementing the feature. In our inner flink, many configs used in yaml can >>>> be adjusted by dynamic to avoid restarting the job, for examples as follow: >>>> >>>> 1. Limit the input qps. >>>> 2. Degrade the job by sampling and so on. >>>> 3. Reset kafka offset in certain cases. >>>> 4. Stop checkpoint in certain cases. >>>> 5. Control the history consuming. >>>> 6. Change log level for debug. >>>> >>>> >>>> After deep discussion, we realize that a common control flow >>>> will benefit both users and developers. Dynamic config is just one of the >>>> use cases. For the concrete design and implementation, it relates with many >>>> components, like jobmaster, network channel, operators and so on, which >>>> needs deeper consideration and design. >>>> >>>> Xintong Song [via Apache Flink User Mailing List archive.] < >>>> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道: >>>> >>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the >>>>> feedback. >>>>> >>>>> I was part of the preliminary offline discussions before this proposal >>>>> went public. So maybe I can help clarify things a bit. >>>>> >>>>> In short, despite the phrase "control mode" might be a bit misleading, >>>>> what we truly want to do from my side is to make the concept of "control >>>>> flow" explicit and expose it to users. >>>>> >>>>> ## Background >>>>> Jiangang & his colleagues at Kuaishou maintain an internal version of >>>>> Flink. One of their custom features is allowing dynamically changing >>>>> operator behaviors via the REST APIs. He's willing to contribute this >>>>> feature to the community, and came to Yun Gao and me for suggestions. >>>>> After >>>>> discussion, we feel that the underlying question to be answered is how do >>>>> we model the control flow in Flink. Dynamically controlling jobs via REST >>>>> API can be one of the features built on top of the control flow, and there >>>>> could be others. >>>>> >>>>> ## Control flow >>>>> Control flow refers to the communication channels for sending >>>>> events/signals to/between tasks/operators, that changes Flink's behavior >>>>> in >>>>> a way that may or may not affect the computation logic. Typical control >>>>> events/signals Flink currently has are watermarks and checkpoint barriers. >>>>> >>>>> In general, for modeling control flow, the following questions should >>>>> be considered. >>>>> 1. Who (which component) is responsible for generating the control >>>>> messages? >>>>> 2. Who (which component) is responsible for reacting to the messages. >>>>> 3. How do the messages propagate? >>>>> 4. When it comes to affecting the computation logics, how should the >>>>> control flow work together with the exact-once consistency. >>>>> >>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably >>>>> share many things in common. A unified control flow model would help >>>>> deduplicate the common logics, allowing us to focus on the use case >>>>> specific parts. >>>>> >>>>> E.g., >>>>> - Watermarks: generated by source operators, handled by window >>>>> operators. >>>>> - Checkpoint barrier: generated by the checkpoint coordinator, handled >>>>> by all tasks >>>>> - Dynamic controlling: generated by JobMaster (in reaction to the REST >>>>> command), handled by specific operators/UDFs >>>>> - Operator defined events: The following features are still in >>>>> planning, but may potentially benefit from the control flow model. (Please >>>>> correct me if I'm wrong, @Yun, @Jark) >>>>> * Iteration: When a certain condition is met, we might want to >>>>> signal downstream operators with an event >>>>> * Mini-batch assembling: Flink currently uses special watermarks for >>>>> indicating the end of each mini-batch, which makes it tricky to deal with >>>>> event time related computations. >>>>> * Hive dimension table join: For periodically reloaded hive tables, >>>>> it would be helpful to have specific events signaling that a reloading is >>>>> finished. >>>>> * Bootstrap dimension table join: This is similar to the previous >>>>> one. In cases where we want to fully load the dimension table before >>>>> starting joining the mainstream, it would be helpful to have an event >>>>> signaling the finishing of the bootstrap. >>>>> >>>>> ## Dynamic REST controlling >>>>> Back to the specific feature that Jiangang proposed, I personally >>>>> think it's quite convenient. Currently, to dynamically change the behavior >>>>> of an operator, we need to set up a separate source for the control events >>>>> and leverage broadcast state. Being able to send the events via REST APIs >>>>> definitely improves the usability. >>>>> >>>>> Leveraging dynamic configuration frameworks is for sure one possible >>>>> approach. The reason we are in favor of introducing the control flow is >>>>> that: >>>>> - It benefits not only this specific dynamic controlling feature, but >>>>> potentially other future features as well. >>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration >>>>> framework work together with Flink's consistency mechanism. >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=0>> wrote: >>>>> >>>>>> Thank you for the reply. I have checked the post you mentioned. The >>>>>> dynamic config may be useful sometimes. But it is hard to keep data >>>>>> consistent in flink, for example, what if the dynamic config will take >>>>>> effect when failover. Since dynamic config is a desire for users, maybe >>>>>> flink can support it in some way. >>>>>> >>>>>> For the control mode, dynamic config is just one of the control >>>>>> modes. In the google doc, I have list some other cases. For example, >>>>>> control events are generated in operators or external services. Besides >>>>>> user's dynamic config, flink system can support some common dynamic >>>>>> configuration, like qps limit, checkpoint control and so on. >>>>>> >>>>>> It needs good design to handle the control mode structure. Based on >>>>>> that, other control features can be added easily later, like changing log >>>>>> level when job is running. In the end, flink will not just process data, >>>>>> but also interact with users to receive control events like a service. >>>>>> >>>>>> Steven Wu <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五 >>>>>> 下午11:11写道: >>>>>> >>>>>>> I am not sure if we should solve this problem in Flink. This is more >>>>>>> like a dynamic config problem that probably should be solved by some >>>>>>> configuration framework. Here is one post from google search: >>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a >>>>>>> >>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=2>> wrote: >>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> Flink jobs are always long-running. When the job is running, >>>>>>>> users may want to control the job but not stop it. The control reasons >>>>>>>> can >>>>>>>> be different as following: >>>>>>>> >>>>>>>> 1. >>>>>>>> >>>>>>>> Change data processing’ logic, such as filter condition. >>>>>>>> 2. >>>>>>>> >>>>>>>> Send trigger events to make the progress forward. >>>>>>>> 3. >>>>>>>> >>>>>>>> Define some tools to degrade the job, such as limit input qps, >>>>>>>> sampling data. >>>>>>>> 4. >>>>>>>> >>>>>>>> Change log level to debug current problem. >>>>>>>> >>>>>>>> The common way to do this is to stop the job, do >>>>>>>> modifications and start the job. It may take a long time to recover. In >>>>>>>> some situations, stopping jobs is intolerable, for example, the job is >>>>>>>> related to money or important activities.So we need some >>>>>>>> technologies to control the running job without stopping the job. >>>>>>>> >>>>>>>> >>>>>>>> We propose to add control mode for flink. A control mode based on >>>>>>>> the restful interface is first introduced. It works by these steps: >>>>>>>> >>>>>>>> >>>>>>>> 1. The user can predefine some logic which supports config >>>>>>>> control, such as filter condition. >>>>>>>> 2. Run the job. >>>>>>>> 3. If the user wants to change the job's running logic, just >>>>>>>> send a restful request with the responding config. >>>>>>>> >>>>>>>> Other control modes will also be considered in the future. More >>>>>>>> introduction can refer to the doc >>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing >>>>>>>> . If the community likes the proposal, more discussion is needed and a >>>>>>>> more >>>>>>>> detailed design will be given later. Any suggestions and ideas are >>>>>>>> welcome. >>>>>>>> >>>>>>>> >>>>> >>>>> ------------------------------ >>>>> If you reply to this email, your message will be added to the >>>>> discussion below: >>>>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html >>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>> email ml+s2336050n1...@n4.nabble.com >>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>> here >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=> >>>>> . >>>>> NAML >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>> >>>> >>