big +1 for this feature, 1. Reset kafka offset in certain cases. 2. Stop checkpoint in certain cases. 3. Change log level for debug.
刘建刚 <liujiangangp...@gmail.com> 于2021年6月11日周五 下午12:17写道: > Thanks for all the discussions and suggestions. Since the topic has > been discussed for about a week, it is time to have a conclusion and new > ideas are welcomed at the same time. > First, the topic starts with use cases in restful interface. The > restful interface supported many useful interactions with users, for > example as follows. It is an easy way to control the job compared with > broadcast api. > > 1. Change data processing’ logic by dynamic configs, such as filter > condition. > 2. Define some tools to control the job, such as QPS limit, sampling, > change log level and so on. > > Second, we broaden the topic to control flow in order to support all > kinds of control events besides the above user cases. There is a strong > demand to support custom (broadcast) events for iteration, SQL control > events and so on. As Xintong Song said, the key to the control flow lies as > follows: > > 1. Who (which component) is responsible for generating the control > messages? It may be the jobmaster by some ways, the inner operator and so > on. > 2. Who (which component) is responsible for reacting to the messages. > 3. How do the messages propagate? Flink should support sending control > messages by channels. > 4. When it comes to affecting the computation logics, how should the > control flow work together with the exact-once consistency. To use the > checkpoint mechanism, control messages flowing from source to down tasks > may be a good idea. > > Third, a common and flexible control flow design requires good design > and implementation as a base. Future features and existing features should > both be considered. For future features, a common restful interface is > first needed to support dynamic configs. For existing features, There exist > checkpoint barriers, watermark and latency marker. They have some special > behaviors but also share a lot in common. The common logic should be > considered but maybe they should remain unchanged until the control flow is > stable. > Some other problems as follows: > > 1. How to persist the control signals when the jobmaster fails? An > idea is to persist control signals in HighAvailabilityServices and replay > them after failover. The restful request should be non-blocking. > 2. Should all the operators receive the control messages? All > operators should have the ability to receive upper operators' control > messages but maybe not process them. If we want to persist the control > message state, all the subtasks belonging to one operator should have the > same control events in order to rescale easily. > > For the next step, I will draft a FLIP with the scope of common > control flow framework. More discussions, ideas and problems are still > welcome. > > Thank you~ > > Jiangang Liu > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2021年6月9日周三 下午12:01写道: > >> > >> > 2. There are two kinds of existing special elements, special stream >> > records (e.g. watermarks) and events (e.g. checkpoint barrier). They all >> > flow through the whole DAG, but events needs to be acknowledged by >> > downstream and can overtake records, while stream records are not). So >> I’m >> > wondering if we plan to unify the two approaches in the new control flow >> > (as Xintong mentioned both in the previous mails)? >> > >> >> TBH, I don't really know yet. We feel that the control flow is a >> non-trivial topic and it would be better to bring it up publicly as early >> as possible, while the concrete plan is still on the way. >> >> Personally, I'm leaning towards not touching the existing watermarks and >> checkpoint barriers in the first step. >> - I'd expect the control flow to be introduced as an experimental feature >> that takes time to stabilize. It would be better that the existing >> important features like checkpointing and watermarks stay unaffected. >> - Checkpoint barriers are a little different, as other control messages >> somehow rely on it to achieve exactly once consistency. Without the >> concrete design, I'm not entirely sure whether it can be properly modeled >> as a special case of general control messages. >> - Watermarks are probably similar to the other control messages. However, >> it's already exposed to users as public APIs. If we want to migrate it to >> the new control flow, we'd be very careful not to break any compatibility. >> >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Wed, Jun 9, 2021 at 11:30 AM Steven Wu <stevenz...@gmail.com> wrote: >> >> > > producing control events from JobMaster is similar to triggering a >> > savepoint. >> > >> > Paul, here is what I see the difference. Upon job or jobmanager >> recovery, >> > we don't need to recover and replay the savepoint trigger signal. >> > >> > On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <paullin3...@gmail.com> wrote: >> > >> >> +1 for this feature. Setting up a separate control stream is too much >> for >> >> many use cases, it would very helpful if users can leverage the >> built-in >> >> control flow of Flink. >> >> >> >> My 2 cents: >> >> 1. @Steven IMHO, producing control events from JobMaster is similar to >> >> triggering a savepoint. The REST api is non-blocking, and users should >> poll >> >> the results to confirm the operation is succeeded. If something goes >> wrong, >> >> it’s user’s responsibility to retry. >> >> 2. There are two kinds of existing special elements, special stream >> >> records (e.g. watermarks) and events (e.g. checkpoint barrier). They >> all >> >> flow through the whole DAG, but events needs to be acknowledged by >> >> downstream and can overtake records, while stream records are not). So >> I’m >> >> wondering if we plan to unify the two approaches in the new control >> flow >> >> (as Xintong mentioned both in the previous mails)? >> >> >> >> Best, >> >> Paul Lam >> >> >> >> 2021年6月8日 14:08,Steven Wu <stevenz...@gmail.com> 写道: >> >> >> >> >> >> I can see the benefits of control flow. E.g., it might help the old >> (and >> >> inactive) FLIP-17 side input. I would suggest that we add more details >> of >> >> some of the potential use cases. >> >> >> >> Here is one mismatch with using control flow for dynamic config. >> Dynamic >> >> config is typically targeted/loaded by one specific operator. Control >> flow >> >> will propagate the dynamic config to all operators. not a problem per >> se >> >> >> >> Regarding using the REST api (to jobmanager) for accepting control >> >> signals from external system, where are we going to persist/checkpoint >> the >> >> signal? jobmanager can die before the control signal is propagated and >> >> checkpointed. Did we lose the control signal in this case? >> >> >> >> >> >> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <tonysong...@gmail.com> >> >> wrote: >> >> >> >>> +1 on separating the effort into two steps: >> >>> >> >>> 1. Introduce a common control flow framework, with flexible >> >>> interfaces for generating / reacting to control messages for >> various >> >>> purposes. >> >>> 2. Features that leverating the control flow can be worked on >> >>> concurrently >> >>> >> >>> Meantime, keeping collecting potential features that may leverage the >> >>> control flow should be helpful. It provides good inputs for the >> control >> >>> flow framework design, to make the framework common enough to cover >> the >> >>> potential use cases. >> >>> >> >>> My suggestions on the next steps: >> >>> >> >>> 1. Allow more time for opinions to be heard and potential use cases >> >>> to be collected >> >>> 2. Draft a FLIP with the scope of common control flow framework >> >>> 3. We probably need a poc implementation to make sure the framework >> >>> covers at least the following scenarios >> >>> 1. Produce control events from arbitrary operators >> >>> 2. Produce control events from JobMaster >> >>> 3. Consume control events from arbitrary operators downstream >> >>> where the events are produced >> >>> >> >>> >> >>> Thank you~ >> >>> Xintong Song >> >>> >> >>> >> >>> >> >>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <yungao...@aliyun.com> wrote: >> >>> >> >>>> Very thanks Jiangang for bringing this up and very thanks for the >> >>>> discussion! >> >>>> >> >>>> I also agree with the summarization by Xintong and Jing that control >> >>>> flow seems to be >> >>>> a common buidling block for many functionalities and dynamic >> >>>> configuration framework >> >>>> is a representative application that frequently required by users. >> >>>> Regarding the control flow, >> >>>> currently we are also considering the design of iteration for the >> >>>> flink-ml, and as Xintong has pointed >> >>>> out, it also required the control flow in cases like detection global >> >>>> termination inside the iteration >> >>>> (in this case we need to broadcast an event through the iteration >> >>>> body to detect if there are still >> >>>> records reside in the iteration body). And regarding whether to >> >>>> implement the dynamic configuration >> >>>> framework, I also agree with Xintong that the consistency guarantee >> >>>> would be a point to consider, we >> >>>> might consider if we need to ensure every operator could receive the >> >>>> dynamic configuration. >> >>>> >> >>>> Best, >> >>>> Yun >> >>>> >> >>>> >> >>>> >> >>>> ------------------------------------------------------------------ >> >>>> Sender:kai wang<yiduwang...@gmail.com> >> >>>> Date:2021/06/08 11:52:12 >> >>>> Recipient:JING ZHANG<beyond1...@gmail.com> >> >>>> Cc:刘建刚<liujiangangp...@gmail.com>; Xintong Song [via Apache Flink >> User >> >>>> Mailing List archive.]<ml+s2336050n44245...@n4.nabble.com>; user< >> >>>> user@flink.apache.org>; dev<d...@flink.apache.org> >> >>>> Theme:Re: Add control mode for flink >> >>>> >> >>>> >> >>>> >> >>>> I'm big +1 for this feature. >> >>>> >> >>>> 1. Limit the input qps. >> >>>> 2. Change log level for debug. >> >>>> >> >>>> in my team, the two examples above are needed >> >>>> >> >>>> JING ZHANG <beyond1...@gmail.com> 于2021年6月8日周二 上午11:18写道: >> >>>> >> >>>>> Thanks Jiangang for bringing this up. >> >>>>> As mentioned in Jiangang's email, `dynamic configuration framework` >> >>>>> provides many useful functions in Kuaishou, because it could update >> job >> >>>>> behavior without relaunching the job. The functions are very >> popular in >> >>>>> Kuaishou, we also see similar demands in maillist [1]. >> >>>>> >> >>>>> I'm big +1 for this feature. >> >>>>> >> >>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the >> >>>>> idea about introducing control mode in Flink. >> >>>>> It takes the original issue a big step closer to essence which also >> >>>>> provides the possibility for more fantastic features as mentioned in >> >>>>> Xintong and Jark's response. >> >>>>> Based on the idea, there are at least two milestones to achieve the >> >>>>> goals which were proposed by Jiangang: >> >>>>> (1) Build a common control flow framework in Flink. >> >>>>> It focuses on control flow propagation. And, how to integrate >> the >> >>>>> common control flow framework with existing mechanisms. >> >>>>> (2) Builds a dynamic configuration framework which is exposed to >> users >> >>>>> directly. >> >>>>> We could see dynamic configuration framework is a top >> application >> >>>>> on the underlying control flow framework. >> >>>>> It focuses on the Public API which receives configuration >> >>>>> updating requests from users. Besides, it is necessary to introduce >> an API >> >>>>> protection mechanism to avoid job performance degradation caused by >> too >> >>>>> many control events. >> >>>>> >> >>>>> I suggest splitting the whole design into two after we reach a >> >>>>> consensus on whether to introduce this feature because these two >> sub-topic >> >>>>> all need careful design. >> >>>>> >> >>>>> >> >>>>> [ >> >>>>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html >> >>>>> ] >> >>>>> >> >>>>> Best regards, >> >>>>> JING ZHANG >> >>>>> >> >>>>> 刘建刚 <liujiangangp...@gmail.com> 于2021年6月8日周二 上午10:01写道: >> >>>>> >> >>>>>> Thanks Xintong Song for the detailed supplement. Since flink is >> >>>>>> long-running, it is similar to many services. So interacting with >> it or >> >>>>>> controlling it is a common desire. This was our initial thought >> when >> >>>>>> implementing the feature. In our inner flink, many configs used in >> yaml can >> >>>>>> be adjusted by dynamic to avoid restarting the job, for examples >> as follow: >> >>>>>> >> >>>>>> 1. Limit the input qps. >> >>>>>> 2. Degrade the job by sampling and so on. >> >>>>>> 3. Reset kafka offset in certain cases. >> >>>>>> 4. Stop checkpoint in certain cases. >> >>>>>> 5. Control the history consuming. >> >>>>>> 6. Change log level for debug. >> >>>>>> >> >>>>>> >> >>>>>> After deep discussion, we realize that a common control flow >> >>>>>> will benefit both users and developers. Dynamic config is just one >> of the >> >>>>>> use cases. For the concrete design and implementation, it relates >> with many >> >>>>>> components, like jobmaster, network channel, operators and so on, >> which >> >>>>>> needs deeper consideration and design. >> >>>>>> >> >>>>>> Xintong Song [via Apache Flink User Mailing List archive.] < >> >>>>>> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道: >> >>>>>> >> >>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the >> >>>>>>> feedback. >> >>>>>>> >> >>>>>>> I was part of the preliminary offline discussions before this >> >>>>>>> proposal went public. So maybe I can help clarify things a bit. >> >>>>>>> >> >>>>>>> In short, despite the phrase "control mode" might be a bit >> >>>>>>> misleading, what we truly want to do from my side is to make the >> concept of >> >>>>>>> "control flow" explicit and expose it to users. >> >>>>>>> >> >>>>>>> ## Background >> >>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version >> >>>>>>> of Flink. One of their custom features is allowing dynamically >> changing >> >>>>>>> operator behaviors via the REST APIs. He's willing to contribute >> this >> >>>>>>> feature to the community, and came to Yun Gao and me for >> suggestions. After >> >>>>>>> discussion, we feel that the underlying question to be answered >> is how do >> >>>>>>> we model the control flow in Flink. Dynamically controlling jobs >> via REST >> >>>>>>> API can be one of the features built on top of the control flow, >> and there >> >>>>>>> could be others. >> >>>>>>> >> >>>>>>> ## Control flow >> >>>>>>> Control flow refers to the communication channels for sending >> >>>>>>> events/signals to/between tasks/operators, that changes Flink's >> behavior in >> >>>>>>> a way that may or may not affect the computation logic. Typical >> control >> >>>>>>> events/signals Flink currently has are watermarks and checkpoint >> barriers. >> >>>>>>> >> >>>>>>> In general, for modeling control flow, the following questions >> >>>>>>> should be considered. >> >>>>>>> 1. Who (which component) is responsible for generating the control >> >>>>>>> messages? >> >>>>>>> 2. Who (which component) is responsible for reacting to the >> messages. >> >>>>>>> 3. How do the messages propagate? >> >>>>>>> 4. When it comes to affecting the computation logics, how should >> the >> >>>>>>> control flow work together with the exact-once consistency. >> >>>>>>> >> >>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) >> probably >> >>>>>>> share many things in common. A unified control flow model would >> help >> >>>>>>> deduplicate the common logics, allowing us to focus on the use >> case >> >>>>>>> specific parts. >> >>>>>>> >> >>>>>>> E.g., >> >>>>>>> - Watermarks: generated by source operators, handled by window >> >>>>>>> operators. >> >>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator, >> >>>>>>> handled by all tasks >> >>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the >> >>>>>>> REST command), handled by specific operators/UDFs >> >>>>>>> - Operator defined events: The following features are still in >> >>>>>>> planning, but may potentially benefit from the control flow >> model. (Please >> >>>>>>> correct me if I'm wrong, @Yun, @Jark) >> >>>>>>> * Iteration: When a certain condition is met, we might want to >> >>>>>>> signal downstream operators with an event >> >>>>>>> * Mini-batch assembling: Flink currently uses special watermarks >> >>>>>>> for indicating the end of each mini-batch, which makes it tricky >> to deal >> >>>>>>> with event time related computations. >> >>>>>>> * Hive dimension table join: For periodically reloaded hive >> >>>>>>> tables, it would be helpful to have specific events signaling >> that a >> >>>>>>> reloading is finished. >> >>>>>>> * Bootstrap dimension table join: This is similar to the >> previous >> >>>>>>> one. In cases where we want to fully load the dimension table >> before >> >>>>>>> starting joining the mainstream, it would be helpful to have an >> event >> >>>>>>> signaling the finishing of the bootstrap. >> >>>>>>> >> >>>>>>> ## Dynamic REST controlling >> >>>>>>> Back to the specific feature that Jiangang proposed, I personally >> >>>>>>> think it's quite convenient. Currently, to dynamically change the >> behavior >> >>>>>>> of an operator, we need to set up a separate source for the >> control events >> >>>>>>> and leverage broadcast state. Being able to send the events via >> REST APIs >> >>>>>>> definitely improves the usability. >> >>>>>>> >> >>>>>>> Leveraging dynamic configuration frameworks is for sure one >> possible >> >>>>>>> approach. The reason we are in favor of introducing the control >> flow is >> >>>>>>> that: >> >>>>>>> - It benefits not only this specific dynamic controlling feature, >> >>>>>>> but potentially other future features as well. >> >>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic >> configuration >> >>>>>>> framework work together with Flink's consistency mechanism. >> >>>>>>> >> >>>>>>> Thank you~ >> >>>>>>> Xintong Song >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email] >> >>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=0>> wrote: >> >>>>>>> >> >>>>>>>> Thank you for the reply. I have checked the post you mentioned. >> The >> >>>>>>>> dynamic config may be useful sometimes. But it is hard to keep >> data >> >>>>>>>> consistent in flink, for example, what if the dynamic config >> will take >> >>>>>>>> effect when failover. Since dynamic config is a desire for >> users, maybe >> >>>>>>>> flink can support it in some way. >> >>>>>>>> >> >>>>>>>> For the control mode, dynamic config is just one of the control >> >>>>>>>> modes. In the google doc, I have list some other cases. For >> example, >> >>>>>>>> control events are generated in operators or external services. >> Besides >> >>>>>>>> user's dynamic config, flink system can support some common >> dynamic >> >>>>>>>> configuration, like qps limit, checkpoint control and so on. >> >>>>>>>> >> >>>>>>>> It needs good design to handle the control mode structure. Based >> on >> >>>>>>>> that, other control features can be added easily later, like >> changing log >> >>>>>>>> level when job is running. In the end, flink will not just >> process data, >> >>>>>>>> but also interact with users to receive control events like a >> service. >> >>>>>>>> >> >>>>>>>> Steven Wu <[hidden email] >> >>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=1>> >> 于2021年6月4日周五 >> >>>>>>>> 下午11:11写道: >> >>>>>>>> >> >>>>>>>>> I am not sure if we should solve this problem in Flink. This is >> >>>>>>>>> more like a dynamic config problem that probably should be >> solved by some >> >>>>>>>>> configuration framework. Here is one post from google search: >> >>>>>>>>> >> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a >> >>>>>>>>> >> >>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email] >> >>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=2>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Hi everyone, >> >>>>>>>>>> Flink jobs are always long-running. When the job is >> >>>>>>>>>> running, users may want to control the job but not stop it. >> The control >> >>>>>>>>>> reasons can be different as following: >> >>>>>>>>>> >> >>>>>>>>>> 1. Change data processing’ logic, such as filter condition. >> >>>>>>>>>> 2. Send trigger events to make the progress forward. >> >>>>>>>>>> 3. Define some tools to degrade the job, such as limit input >> >>>>>>>>>> qps, sampling data. >> >>>>>>>>>> 4. Change log level to debug current problem. >> >>>>>>>>>> >> >>>>>>>>>> The common way to do this is to stop the job, do >> >>>>>>>>>> modifications and start the job. It may take a long time to >> recover. In >> >>>>>>>>>> some situations, stopping jobs is intolerable, for example, >> the job is >> >>>>>>>>>> related to money or important activities.So we need some >> >>>>>>>>>> technologies to control the running job without stopping the >> job. >> >>>>>>>>>> >> >>>>>>>>>> We propose to add control mode for flink. A control mode based >> on >> >>>>>>>>>> the restful interface is first introduced. It works by these >> steps: >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> 1. The user can predefine some logic which supports config >> >>>>>>>>>> control, such as filter condition. >> >>>>>>>>>> 2. Run the job. >> >>>>>>>>>> 3. If the user wants to change the job's running logic, just >> >>>>>>>>>> send a restful request with the responding config. >> >>>>>>>>>> >> >>>>>>>>>> Other control modes will also be considered in the future. More >> >>>>>>>>>> introduction can refer to the doc >> >>>>>>>>>> >> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing >> >>>>>>>>>> . If the community likes the proposal, more discussion is >> needed and a more >> >>>>>>>>>> detailed design will be given later. Any suggestions and ideas >> are welcome. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>> >> >>>>>>> ------------------------------ >> >>>>>>> If you reply to this email, your message will be added to the >> >>>>>>> discussion below: >> >>>>>>> >> >>>>>>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html >> >>>>>>> To start a new topic under Apache Flink User Mailing List >> archive., >> >>>>>>> email ml+s2336050n1...@n4.nabble.com >> >>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >> >>>>>>> here >> >>>>>>> < >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI= >> > >> >>>>>>> . >> >>>>>>> NAML >> >>>>>>> < >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml >> > >> >>>>>>> >> >>>>>> >> >>>> >> >> >> >