Thanks Xintong for the summary, I'm big +1 for this feature.
Xintong's summary for Table/SQL's needs is correct. The "custom (broadcast) event" feature is important to us and even blocks further awesome features and optimizations in Table/SQL. I also discussed offline with @Yun Gao <yungao...@aliyun.com> several times for this topic, and we all agreed this is a reasonable feature but may need some careful design. Best, Jark On Mon, 7 Jun 2021 at 14:52, Xintong Song <tonysong...@gmail.com> wrote: > Thanks Jiangang for bringing this up, and Steven & Peter for the feedback. > > I was part of the preliminary offline discussions before this proposal > went public. So maybe I can help clarify things a bit. > > In short, despite the phrase "control mode" might be a bit misleading, > what we truly want to do from my side is to make the concept of "control > flow" explicit and expose it to users. > > ## Background > Jiangang & his colleagues at Kuaishou maintain an internal version of > Flink. One of their custom features is allowing dynamically changing > operator behaviors via the REST APIs. He's willing to contribute this > feature to the community, and came to Yun Gao and me for suggestions. After > discussion, we feel that the underlying question to be answered is how do > we model the control flow in Flink. Dynamically controlling jobs via REST > API can be one of the features built on top of the control flow, and there > could be others. > > ## Control flow > Control flow refers to the communication channels for sending > events/signals to/between tasks/operators, that changes Flink's behavior in > a way that may or may not affect the computation logic. Typical control > events/signals Flink currently has are watermarks and checkpoint barriers. > > In general, for modeling control flow, the following questions should be > considered. > 1. Who (which component) is responsible for generating the control > messages? > 2. Who (which component) is responsible for reacting to the messages. > 3. How do the messages propagate? > 4. When it comes to affecting the computation logics, how should the > control flow work together with the exact-once consistency. > > 1) & 2) may vary depending on the use cases, while 3) & 4) probably share > many things in common. A unified control flow model would help deduplicate > the common logics, allowing us to focus on the use case specific parts. > > E.g., > - Watermarks: generated by source operators, handled by window operators. > - Checkpoint barrier: generated by the checkpoint coordinator, handled by > all tasks > - Dynamic controlling: generated by JobMaster (in reaction to the REST > command), handled by specific operators/UDFs > - Operator defined events: The following features are still in planning, > but may potentially benefit from the control flow model. (Please correct me > if I'm wrong, @Yun, @Jark) > * Iteration: When a certain condition is met, we might want to signal > downstream operators with an event > * Mini-batch assembling: Flink currently uses special watermarks for > indicating the end of each mini-batch, which makes it tricky to deal with > event time related computations. > * Hive dimension table join: For periodically reloaded hive tables, it > would be helpful to have specific events signaling that a reloading is > finished. > * Bootstrap dimension table join: This is similar to the previous one. > In cases where we want to fully load the dimension table before starting > joining the mainstream, it would be helpful to have an event signaling the > finishing of the bootstrap. > > ## Dynamic REST controlling > Back to the specific feature that Jiangang proposed, I personally think > it's quite convenient. Currently, to dynamically change the behavior of an > operator, we need to set up a separate source for the control events and > leverage broadcast state. Being able to send the events via REST APIs > definitely improves the usability. > > Leveraging dynamic configuration frameworks is for sure one possible > approach. The reason we are in favor of introducing the control flow is > that: > - It benefits not only this specific dynamic controlling feature, but > potentially other future features as well. > - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration > framework work together with Flink's consistency mechanism. > > Thank you~ > > Xintong Song > > > > On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <liujiangangp...@gmail.com> wrote: > >> Thank you for the reply. I have checked the post you mentioned. The >> dynamic config may be useful sometimes. But it is hard to keep data >> consistent in flink, for example, what if the dynamic config will take >> effect when failover. Since dynamic config is a desire for users, maybe >> flink can support it in some way. >> >> For the control mode, dynamic config is just one of the control modes. In >> the google doc, I have list some other cases. For example, control events >> are generated in operators or external services. Besides user's dynamic >> config, flink system can support some common dynamic configuration, like >> qps limit, checkpoint control and so on. >> >> It needs good design to handle the control mode structure. Based on that, >> other control features can be added easily later, like changing log level >> when job is running. In the end, flink will not just process data, but also >> interact with users to receive control events like a service. >> >> Steven Wu <stevenz...@gmail.com> 于2021年6月4日周五 下午11:11写道: >> >>> I am not sure if we should solve this problem in Flink. This is more >>> like a dynamic config problem that probably should be solved by some >>> configuration framework. Here is one post from google search: >>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a >>> >>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <liujiangangp...@gmail.com> wrote: >>> >>>> Hi everyone, >>>> >>>> Flink jobs are always long-running. When the job is running, >>>> users may want to control the job but not stop it. The control reasons can >>>> be different as following: >>>> >>>> 1. >>>> >>>> Change data processing’ logic, such as filter condition. >>>> 2. >>>> >>>> Send trigger events to make the progress forward. >>>> 3. >>>> >>>> Define some tools to degrade the job, such as limit input qps, >>>> sampling data. >>>> 4. >>>> >>>> Change log level to debug current problem. >>>> >>>> The common way to do this is to stop the job, do modifications >>>> and start the job. It may take a long time to recover. In some situations, >>>> stopping jobs is intolerable, for example, the job is related to money or >>>> important activities.So we need some technologies to control the >>>> running job without stopping the job. >>>> >>>> >>>> We propose to add control mode for flink. A control mode based on the >>>> restful interface is first introduced. It works by these steps: >>>> >>>> >>>> 1. The user can predefine some logic which supports config control, >>>> such as filter condition. >>>> 2. Run the job. >>>> 3. If the user wants to change the job's running logic, just send a >>>> restful request with the responding config. >>>> >>>> Other control modes will also be considered in the future. More >>>> introduction can refer to the doc >>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing >>>> . If the community likes the proposal, more discussion is needed and a more >>>> detailed design will be given later. Any suggestions and ideas are welcome. >>>> >>>>