Hi, devs, Thank you all for your inspirational responses, and sorry for the late reply. Since more and more people are joining the discussion and may have missed some of the previous replies, I'd like to summarize the unresolved comments so far here, to make sure we're all on the same page and won't miss any potential concerns.
1. There are some ideas that recommend using SQL hint to configure TTL, and > the considerations given are simple and easy to understand. (Raised by > @Yisha, @Shuo, @Shammon, @Benchao and @Martijn) Actually, using SQL hints is the first thing that comes to my mind when thinking about the design, but as I explained in the previous reply, AFAIK, with the scope of operator-level configuration, I see the limitation of using this approach. And I'm also curious about how to use the hint approach to cover cases like - configuring TTL for operators like ChangelogNormalize, SinkUpsertMaterializer, etc., these operators are derived by the planner implicitly - cope with two/multiple input stream operator's state TTL, like join, and other operations like row_number, rank, correlate, etc. Here I'd like to pick the Q4 from the TPC-H v3.0.1 benchmark [1] to discuss the hint propagation mechanism. Q4 involves sub-query correlation (which will ultimately translate to semi-join), and group aggregation. From my own experience, I found it challenging to put the hints to set different TTLs for the left join side, the right join side, and the group aggregate. It would be great if someone could shed some light on how to well implement the hints approach to cover the cases like the following example. On the other hand, if we relax the constraints for configuration granularity, I think the hints approach might be helpful and welcome a separate FLIP discussion. In a nutshell, I'm not unconditionally against using hints; I'm against the use of hints with a semantic like configuring TTL at operator granularity, and am ultimately open to the hints approach if the limitations can be well resolved. -- TPC-H Order Priority Checking Query (Q4) select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date '[DATE]' and o_orderdate < date '[DATE]' + interval '3' month and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate) group by o_orderpriority order by o_orderpriority; > 2. Renaming the FLIP title, indicating that it is an enhancement to the > sql compiled plan (Raised by @Shuo and @Lincoln) As previously replied, the compiled plan works for both TableAPI and SQL; Meanwhile, I don't mean to say that there is no possibility for any changes in the future. In order to make room for further discussions on hints, I think it is acceptable to change the title to "Enhance COMPILED PLAN to support operator-level state TTL configuration", which is suggested by @Shuo. 3. The extra maintenance of compiled plan breaks the habit of SQL users, > and users must submit their jobs by the plan. (Raised by @Jing) As explained in the reply, only the case of the fine-grained configuration of TTL will go this way; other users' habits will not be disturbed since this is an advanced user requirement. Meanwhile, I checked the "Proposed Changes" part of FLIP-190; the first change is "Expose the JSON plan concept to users". So IIUC the JSON plan is not a new concept. Can I draw a conclusion that the concern is mainly about the ease of use of the features? If a graphical IDE could help to ease the usage, I suggest opening a separate FLIP to discuss whether we can bring the visualizer back. WDYT? [image: image.png] 4. The compiled sql plan introduced by FLIP-190 is only used for SQL job > migration/update. Common stages that Flink uses to produce the execution > plan from SQL does not contain the compiling step. (Raised by @Jing) IIUC, all SQL jobs go through the following three steps to run, no matter with/without FLIP-190 <1> parsing into AST and then Operation by the parser; <2> optimizing the original rel with rule-based and cost-based optimizers into physical rel nodes and then exec nodes by the planner; <3> transforming exec nodes to transformations and then generating JobGraph to run. FLIP-190 serializes the result of step <2> as a side output in JSON format and dumps it into a file. The file could be served as a hook to allow some advanced configuration to happen during the intermediate step, and then continue with step <3>. From this point, I'd like to say FLIP-190 is introducing a generalized mechanism, not just a use case for migration/upgrade. > 5. A truth that can not be ignored is that users usually tend to give up > editing TTL(or operator ID in our case) instead of migrating this > configuration between their versions of one given job. Before we introduce compiled plan editing into > the pipeline of SQL jobs development, maybe we should have a discussion of > how to migrate these plans for user (Raised by @Yisha) Hi @Yisha, thanks so much for sharing your internal use case! I watched your sharing video [2], and am interested in the following points. It would be great if you're willing to share more details with us. - You mentioned that there are 10k+ SQL jobs in your production environment, but only ~100 jobs' migration involves plan editing. Is 10k+ the number of total jobs, or the number of jobs that use stateful computation and need state migration? - You mentioned that "A truth that can not be ignored is that users usually tend to give up editing TTL(or operator ID in our case) instead of migrating this configuration between their versions of one given job." So what would users prefer to do if they're reluctant to edit the operator ID? Would they submit the same SQL as a new job with a higher version to re-accumulating the state from the earliest offset? Back to your suggestions, I noticed that FLIP-190 [3] proposed the following syntax to perform plan migration. AFAIK, this is not implemented yet. Would you have any interest in opening a separate discussion to support this feature? > // Perform plan migration in the future. > // This statement has been added for completeness. Users will need to execute > it > // when we instruct them to do so in the release notes. Plan migration will > be one > // way of dropping legacy code in Flink before we have savepoint migration. > COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json'; > > Looking forward to your feedback! Best, Jane [1] Page#35 at https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v3.0.1.pdf [2] Sharing at FFA 2021 Industry Best Practices for the Afternoon Session on 8th, Jan. https://developer.aliyun.com/special/ffa2021/live [3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489 On Wed, Mar 29, 2023 at 11:55 AM Lincoln Lee <lincoln.8...@gmail.com> wrote: > Thanks Jane driving this flip! And very glad to see the lively discussion > which has inspired me a lot! > > I'd like to share some thoughts: > as many of you mentioned, FLIP-113[1] formally introduced the Calcite's > hints syntax and supported the dynamic table option(i.e., table hints), and > then FLIP-229[2] completed the support for join hints(i.e., query hints), > back to the problem that this proposal is trying to solve, I remembered the > issue FLINK-24254[3] created by Timo, which wanted to introduce a new > config option to achieve fine-grained source/sink configuration(a bit > different from the current dynamic table option). I also agree with this > categorization, so in general we may have three categories of hints (with > possible new ones in the future): > 1. dynamic table options > 2. query hints (join hints and possibly more support in the future) > 3. configuration hints (fine-grained config vs global config) > > So which category does the operator-level state ttl configuration fall > into? > Considering the current global configuration "table.exec.state.ttl", I > would put it in the third category, so I agree with Shuo's mention of > modifying the title of the flip, e.g. "Extend exec plan to support > operator-level state ttl configuration" > > Of course, this extends the original goal of FLINK-24254, not only to > source/sink, but to all stateful operators, but how to design a > configuration hints to support source/sink and all other operators in > general, I don't have a clear answer yet, maybe this is more suitable to be > discussed in a separate flip(hints or a lower level based > configuration,e.g., exec plan, are solutions at different levels, each with > its own advantages and disadvantages, and there seems to be no single > answer to what is the best way to do fine-grained configurations) > > So +1 for this proposal (before a more perfect configuration hints which > can balance the ease of use and generality) > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job > [3] https://issues.apache.org/jira/browse/FLINK-24254 > > Best, > Lincoln Lee > > > Martijn Visser <martijnvis...@apache.org> 于2023年3月28日周二 22:49写道: > > > Hi Jane, > > > > Thanks for creating the FLIP. In general I'm not a fan of using the query > > plan for enabling these kinds of use cases. It introduces a different way > > of submitting SQL jobs in our already extensive list of possibilities, > > making things complicated. I would have a preference for using hints, > given > > that we explicitly mention hints for "Operator resource constraints" [1]. > > For me, that feels like a more natural fit for this use case. > > > > I would like to get @Timo Walther <twal...@apache.org> his opinion on > this > > topic too. > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/ > > > > On Mon, Mar 27, 2023 at 10:22 PM Jing Ge <j...@ververica.com.invalid> > > wrote: > > > > > Hi Jane, > > > > > > Thanks for clarifying it. As far as I am concerned, the issue is where > to > > > keep the user's job metadata, i.e. SQL script (to make the discussion > > > easier, let's ignore config). As long as FLIP-190 is only used for > > > migration/upgrade, SQL is the single source of truth. Once the compiled > > > plan has been modified, in this case ttls, the user's job metadata will > > be > > > distributed into two different places. Each time when the SQL needs > > > changes, extra effort will be required to take care of the modification > > in > > > the compiled plan. > > > > > > Examples: > > > > > > 1. If we try to start the same SQL with a new Flink cluster (one type > of > > > "restart") without knowing the modified compiled plan. The old > > > performance issue will rise again. This might happen when multiple > users > > > are working on the same project who run a working SQL job, get > > performance > > > issues, and have no clue since nothing has been changed. Or one user is > > > working on many SQL jobs who might lose the overview of which SQL job > has > > > modified plans or not. > > > 2. If a SQL has been changed in a backwards compatible way and > (re)start > > > with a given savepoint(NO_CLAIM), the version2 json plan has to be made > > > based on version1, as I mentioned previously, which means each time > when > > > the SQL got changed, the related compiled plan need modification too. > > > Beyond that , it would also be easily forgotten to do it if there were > no > > > connection between the SQL and the related modified compiled plan. The > > SQL > > > job will have the performance issue again after the change. > > > 3. Another scenario would be running a backwards compatible SQL job > with > > an > > > upgraded FLink version, additional upgrade logic or guideline should be > > > developed for e.g. ttl modification in the compiled plan, because > > upgraded > > > Flink engine underneath might lead to a different ttl setting. > > > 4. The last scenario is just like you described that SQL has been > changed > > > significantly so that the compiled operators will be changed too. The > > easy > > > way is to start a fresh new tuning. But since there was a tuning for > the > > > last SQL. User has to compare both compiled plans and copy/paste some > > ttls > > > that might still work. > > > > > > A virtualization tool could help but might not reduce those efforts > > > significantly, since the user behaviour is changed enormously. > > > > > > I was aware that the json string might be large. Doing(EXECUTE PLAN > 'json > > > plan as string') is intended to avoid dealing with files for most > common > > > cases where the json string has common length. > > > > > > Anyway, it should be fine, if it is only recommended for advanced use > > cases > > > where users are aware of those efforts. > > > > > > Best regards, > > > Jing > > > > > > On Sat, Mar 25, 2023 at 3:54 PM Jane Chan <qingyue....@gmail.com> > wrote: > > > > > > > Hi Leonard, Jing and Shengkai, > > > > > > > > Thanks so much for your insightful comments. Here are my thoughts > > > > > > > > @Shengkai > > > > > 1. How the Gateway users use this feature? As far as I know, the > > > EXEUCTE > > > > PLAN only supports local file right now. Is it possible to extend > this > > > > syntax to allow for reading plan files from remote file systems? > > > > > > > > Nice catch! Currently, the "COMPILE PLAN" and "EXECUTE PLAN" > statements > > > > only support a local file path without the scheme (see > > > > TableEnvironmentImpl.java#L773 > > > > < > > > > > > https://github.com/apache/flink/blob/80ee512f00a9a8873926626d66cdcc97164c4595/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L773 > > > >). > > > > It's reasonable to extend the support to Flink's FileSystem. Besides, > > the > > > > JSON plan should also be added to the resource cleaning mechanism for > > the > > > > Gateway mode, just like we do with the "ADD JAR" operation, cleaning > it > > > up > > > > when the session ends. I will take your suggestion and make changes > to > > > FLIP. > > > > > > > > > 2. I would like to inquire if there are any limitations on this > > > feature? > > > > I have encountered several instances where the data did not expire in > > the > > > > upstream operator, but it expired in the downstream operator, > resulting > > > in > > > > abnormal calculation results or direct exceptions thrown by the > > operator > > > > (e.g. rank operator). Can we limit that the expiration time of > > downstream > > > > operator data should be greater than or equal to the expiration time > of > > > > upstream operator data? > > > > > > > > This is an excellent point. In fact, the current state TTL is based > on > > > the > > > > initialization time of each operator, which is inherently unaligned. > > The > > > > probability of such unalignment is magnified now that fine-grained > > > > operator-level TTL is supported. While on the other hand, this FLIP > is > > > not > > > > the root cause of this issue. To systematically solve the problem of > > TTL > > > > unalignment between operators, I understand that we need a larger > FLIP > > to > > > > accomplish this. And I'll mention this point in the FLIP doc. WDYT? > > > > > > > > Back to your suggestions, in most scenarios, the TTL between multiple > > > > state operators should be non-monotonically decreasing, but there may > > be > > > > some exceptions, such as the SinkUpsertMaterializer introduced to > solve > > > the > > > > changelog disorder problem. It may not be appropriate if we block it > at > > > the > > > > implementation level. But it does happen that the users misconfigure > > the > > > > TTL, so in this case, my idea is that, since FLIP-280 > > > > < > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-280%3A+Introduce+EXPLAIN+PLAN_ADVICE+to+provide+SQL+advice > > > > > > > > introduces an experimental feature "EXPLAIN PLAN_ADVICE", and > FLIP-190 > > > > < > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-EXPLAIN > > > > > > also > > > > introduces a new syntax "EXPLAIN PLAN FOR '/foo/bar/sql.json'", what > if > > > we > > > > add a new plan analyzer, which will analyze the compiled plan to > > perform > > > > detection. The analyzer gives a warning attached to the optimized > > > physical > > > > plan when the TTL of the predecessor is larger than the TTL of the > > > > posterior. Will it draw the user's attention and make > troubleshooting > > > > easier? > > > > > > > > @Leonard and @Jing > > > > You both expressed the same concern about the high cost of > > understanding > > > > and changing the behavior of users using SQL. IMO as opposed to the > > usual > > > > features, fine-grained TTL configuration is a feature for advanced > > > users. I > > > > draw a pic to illustrate this. You can see this pic to estimate the > > > funnel > > > > conversion rate, from SQL jobs that involve stateful and > TTL-controlled > > > > operators to jobs that require only one TTL configuration to meet the > > > > requirements, to jobs that eventually require multiple TTL > > > configurations, > > > > which is in a decreasing distribution. The first and second-tier > users > > > > should not feel bothered about this. > > > > [image: image.png] > > > > We will explain in detail in the documentation how to use this > feature, > > > > how to do it, and it is a feature that needs to be used carefully. > > Also, > > > in > > > > conjunction with FLIP-280 and FLIP-190, we can print out the > > > SQL-optimized > > > > physical and execution plan for the JSON file (with tree style just > > like > > > > the normal EXPLAIN statement), would this help the advanced users > > > > understand the compiled JSON plan represents? > > > > > > > > > > > > @Jing > > > > > One thing I didn't fully understand. I might be wrong. Could those > > ttl > > > > configs be survived when SQL jobs are restarted? I have to always > call > > > the > > > > EXECUTE PLAN every time when the job needs to be restarted? > > > > > > > > If it's a new SQL job and has never been submitted before, and users > > want > > > > to enable the fine-grained state TTL control, then they will first > use > > > > COMPILE PLAN statement to generate the JSON file and modify the > > stateful > > > > operator's state metadata as needed, then submit the job via EXECUTE > > PLAN > > > > statement. By the word "restarted", I assume there are historical > > > instances > > > > before and users want to restore from some checkpoints or savepoints. > > > > Without SQL changes, users can directly use Flink CLI $ bin/flink run > > -s > > > > :savepointPath -restoreMode :mode -n [:runArgs] > > > > < > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints > > > > > > to > > > > resume/restart the job with savepoint. In this situation, the > > customized > > > > TTL is still in effect. > > > > > > > > > Does that mean that, once I modified the compiled sql plan, the > json > > > > file will become the sql job? If I am not mistaken, the compiled sql > > plan > > > > introduced by FLIP-190 is only used for SQL job migration/update. > > Common > > > > stages that Flink uses to produce the execution plan from SQL does > not > > > > contain the compiling step. > > > > > > > > I want to explain briefly SQL processing and what FLIP-190 achieves. > > All > > > > SQL jobs go through the following three steps to run, no matter > > > > with/without FLIP-190 > > > > <1> parsing into AST and then Operation by the parser; > > > > <2> optimizing the original rel with rule-based and cost-based > > optimizers > > > > into physical rel nodes and then exec nodes by the planner; > > > > <3> transforming exec nodes to transformations and then generating > > > > JobGraph and streamGraph to run. > > > > > > > > FLIP-190 serializes the result of step <2> as a side output in JSON > > > format > > > > and dumps it into a file. The file serves as a hooker to allow you to > > > make > > > > some changes (such as performing the plan/state migration or tuning > > state > > > > TTL for stateful operators), and then continue with step <3>. From > this > > > > point, I'd like to say FLIP-190 is introducing a > mechanism/possibility > > to > > > > allow some advanced configuration to happen during the intermediate > > step, > > > > not just a use case for migration/upgrade. > > > > > > > > > In case that the original SQL script has been changed, we need to > > > > compile a version2 sql plan and copy the ttl configs from version1 > SQL > > > plan > > > > to version2 and drop version1. This means we have to keep the > compiled > > > json > > > > file and create a link with the original SQL script. I am not sure > if I > > > > understood it correctly, it seems like a lot of maintenance effort. > > > > > The regular working process for Flink SQL users is changed, from > only > > > > dealing with SQL like scripts to moving between SQL like scripts and > > file > > > > modifications back and forth. This is a big change for user > behaviours. > > > > > > > > In fact, it's not just a copy-paste thing. SQL changes may result in > > more > > > > stateful operators or existing stateful operators being deleted, so > the > > > > user cannot simply copy the configuration from the previous JSON > file. > > > What > > > > they should do is carefully consider whether they still need to > enable > > > > fine-grained state TTL configuration for the current new version of > > SQL, > > > > and in which operators they need to configure, and how long the TTL > > > should > > > > be, and modify the new JSON file accordingly. > > > > > > > > > One option could be that we upgrade/extend the COMPILE PLAN to > allow > > > > users update ttl for operators at the script level. But I am not sure > > if > > > it > > > > is possible to point out specific operators at this level. Another > > option > > > > is to print out the result of COMPILE PLAN and enable EXECUTE PLAN > > 'json > > > > plan as string'. Third option is to leverage a data platform to > > > virtualize > > > > the compiled sql plan and provide related interactions for updating > ttl > > > and > > > > submit(execute) the modified compiled sql plan. > > > > > > > > The 1st option might not be feasible. SQL syntax is not easy to > extend > > > > especially for things beyond ANSI SQL standard. While for the 2nd > > option, > > > > in terms of practicality, given that JSON strings can be very long, I > > > don't > > > > think it's as convenient as the EXECUTE PLAN > > > > '/foo/bar/compiled-plan.json' statement, which is already supported > by > > > > FLIP-190. I agree with the 3rd option, and just as @Yun mentioned > > before, > > > > nothing better than a graphical IDE. I think this should be a very > > > helpful > > > > experience improvement for advanced users who want to tune > fine-grained > > > > configurations (not just state TTL) based on an optimized exec plan, > > and > > > > deserves another FLIP. WDYT? > > > > > > > > Best, > > > > Jane > > > > > > > > On Sat, Mar 25, 2023 at 7:27 AM Jing Ge <j...@ververica.com.invalid> > > > > wrote: > > > > > > > >> Thanks Jane for driving this FLIP. > > > >> > > > >> The FLIP is quite interesting. Since the execution plan has finer > > > >> granularity than the plain SQL script, Hints at SQL level might not > be > > > >> able > > > >> to touch specific operators, which turns out that the idea of > > leveraging > > > >> the compiled execution plan is brilliant. > > > >> > > > >> However, there are some concerns that might need to be considered. > > > >> > > > >> - One thing I didn't fully understand. I might be wrong. Could those > > ttl > > > >> configs be survived when SQL jobs are restarted? Does that mean > that, > > > once > > > >> I modified the compiled sql plan, the json file will become the sql > > > job? I > > > >> have to always call the EXECUTE PLAN every time when the job needs > to > > be > > > >> restarted? In case that the original SQL script has been changed, we > > > need > > > >> to compile a version2 sql plan and copy the ttl configs from > version1 > > > sql > > > >> plan to version2 and drop version1. This means we have to keep the > > > >> compiled > > > >> json file and create a link with the original SQL script. I am not > > sure > > > if > > > >> I understood it correctly, it seems like a lot of maintenance > effort. > > > >> - If I am not mistaken, the compiled sql plan introduced by FLIP-190 > > is > > > >> only used for SQL job migration/update. Common stages that Flink > uses > > to > > > >> produce the execution plan from SQL does not contain the compiling > > step. > > > >> This makes one tool do two different jobs[1], upgrade + ttl tuning. > > > >> and tighten the dependency on compiling sql plans. Flink SQL users > > have > > > to > > > >> deal with a compiled sql plan for performance optimization that is > not > > > >> designed for it. > > > >> - The regular working process for Flink SQL users is changed, from > > only > > > >> dealing with SQL like scripts to moving between SQL like scripts and > > > file > > > >> modifications back and forth. This is a big change for user > > behaviours. > > > >> One > > > >> option could be that we upgrade/extend the COMPILE PLAN to allow > users > > > >> update ttl for operators at the script level. But I am not sure if > it > > is > > > >> possible to point out specific operators at this level. Another > option > > > is > > > >> to print out the result of COMPILE PLAN and enable EXECUTE PLAN > 'json > > > plan > > > >> as string'. Third option is to leverage a data platform to > virtualize > > > the > > > >> compiled sql plan and provide related interactions for updating ttl > > and > > > >> submit(execute) the modified compiled sql plan. > > > >> > > > >> On the other side, there is one additional benefit with this > proposal: > > > we > > > >> could fine tune SQL jobs while we migrate/upgrade them. That is > nice! > > > >> > > > >> Best regards, > > > >> Jing > > > >> > > > >> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle > > > >> > > > >> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu <xbjt...@gmail.com> > wrote: > > > >> > > > >> > Thanks Jane for the proposal. > > > >> > > > > >> > TTL of state is an execution phase configuration, serialized json > > > graph > > > >> > file is the graph for execution phase, supporting the operator > level > > > >> state > > > >> > TTL in the execution json file makes sense to me. > > > >> > > > > >> > From the user's perspective, I have two concerns: > > > >> > 1. By modifying the execution graph node configuration, this > raises > > > the > > > >> > cost for users to understand, especially for SQL users. > > > >> > 2. Submitting a SQL job through `exec plan json file` is not so > > > >> intuitive > > > >> > as users cannot see the SQL detail of the job > > > >> > > > > >> > Best, > > > >> > Leonard > > > >> > > > > >> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang <fskm...@gmail.com> > > > >> wrote: > > > >> > > > > >> > > Hi, Jane. > > > >> > > > > > >> > > Thanks for driving this FLIP and this feature are very useful to > > > many > > > >> > > users. But I have two problems about the FLIP: > > > >> > > > > > >> > > 1. How the Gateway users use this feature? As far as I know, the > > > >> EXEUCTE > > > >> > > PLAN only supports local file right now. Is it possible to > extend > > > >> this > > > >> > > syntax to allow for reading plan files from remote file systems? > > > >> > > > > > >> > > 2. I would like to inquire if there are any limitations on this > > > >> feature? > > > >> > I > > > >> > > have encountered several instances where the data did not expire > > in > > > >> the > > > >> > > upstream operator, but it expired in the downstream operator, > > > >> resulting > > > >> > in > > > >> > > abnormal calculation results or direct exceptions thrown by the > > > >> operator > > > >> > > (e.g. rank operator). Can we limit that the expiration time of > > > >> downstream > > > >> > > operator data should be greater than or equal to the expiration > > time > > > >> of > > > >> > > upstream operator data? > > > >> > > > > > >> > > Best, > > > >> > > Shengkai > > > >> > > > > > >> > > Yun Tang <myas...@live.com> 于2023年3月24日周五 14:50写道: > > > >> > > > > > >> > > > Hi, > > > >> > > > > > > >> > > > From my point of view, I am a bit against using SQL hint to > set > > > >> state > > > >> > TTL > > > >> > > > as FlinkSQL could be translated to several stateful operators. > > If > > > we > > > >> > want > > > >> > > > to let different state could have different TTL configs within > > one > > > >> > > > operator, the SQL hint solution could not work. A better way > is > > to > > > >> > allow > > > >> > > a > > > >> > > > graphical IDE to display the stateful operators and let users > > > >> configure > > > >> > > > them. And the IDE submits the json plan to Flink to run jobs. > > > >> > > > > > > >> > > > For the details of the structure of ExecNodes, since the state > > > name > > > >> is > > > >> > > > unique in the underlying state layer, shall we introduce the > > > "index" > > > >> > tag > > > >> > > to > > > >> > > > identify the state config? > > > >> > > > What will happen with the conditions below: > > > >> > > > 1st run: > > > >> > > > { > > > >> > > > "index": 0, > > > >> > > > "ttl": "259200000 ms", > > > >> > > > "name": "join-lef-state" > > > >> > > > }, > > > >> > > > { > > > >> > > > "index": 1, > > > >> > > > "ttl": "86400000 ms", > > > >> > > > "name": "join-right-state" > > > >> > > > } > > > >> > > > > > > >> > > > 2nd run: > > > >> > > > { > > > >> > > > "index": 0, > > > >> > > > "ttl": "86400000 ms", > > > >> > > > "name": "join-right-state" > > > >> > > > }, > > > >> > > > { > > > >> > > > "index": 1, > > > >> > > > "ttl": "259200000 ms", > > > >> > > > "name": "join-lef-state" > > > >> > > > } > > > >> > > > > > > >> > > > Best > > > >> > > > Yun Tang > > > >> > > > ________________________________ > > > >> > > > From: Jane Chan <qingyue....@gmail.com> > > > >> > > > Sent: Friday, March 24, 2023 11:57 > > > >> > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > >> > > > Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL > > at > > > >> > > operator > > > >> > > > level for Table API & SQL programs > > > >> > > > > > > >> > > > Hi Shammon and Shuo, > > > >> > > > > > > >> > > > Thanks for your valuable comments! > > > >> > > > > > > >> > > > Some thoughts: > > > >> > > > > > > >> > > > @Shuo > > > >> > > > > I think it's more properly to say that hint does not affect > > the > > > >> > > > equivalenceof execution plans (hash agg vs sort agg), not the > > > >> > equivalence > > > >> > > > of execution > > > >> > > > results, e.g., users can set 'scan.startup.mode' for kafka > > > >> connector by > > > >> > > > dynamic table option, which > > > >> > > > also "intervene in the calculation of data results". > > > >> > > > > > > >> > > > IMO, the statement that "hint should not interfere with the > > > >> calculation > > > >> > > > results", means it should not interfere with internal > > computation. > > > >> On > > > >> > the > > > >> > > > other hand, 'scan.startup.mode' interferes with the ingestion > of > > > the > > > >> > > data. > > > >> > > > I think these two concepts are different, but of course, this > is > > > >> just > > > >> > my > > > >> > > > opinion and welcome other views. > > > >> > > > > > > >> > > > > I think the final shape of state ttl configuring may like > the > > > >> that, > > > >> > > > userscan define operator state ttl using SQL HINT > > (assumption...), > > > >> but > > > >> > it > > > >> > > > may > > > >> > > > affects more than one stateful operators inside the same query > > > >> block, > > > >> > > then > > > >> > > > users can further configure a specific one by modifying the > > > compiled > > > >> > json > > > >> > > > plan... > > > >> > > > > > > >> > > > Setting aside the issue of semantics, setting TTL from a > higher > > > >> level > > > >> > > seems > > > >> > > > to be attractive. This means that users only need to configure > > > >> > > > 'table.exec.state.ttl' through the existing hint syntax to > > achieve > > > >> the > > > >> > > > effect. Everything is a familiar formula. But is it really the > > > case? > > > >> > > Hints > > > >> > > > apply to a very broad range. Let me give an example. > > > >> > > > > > > >> > > > Suppose a user wants to set different TTLs for the two streams > > in > > > a > > > >> > > stream > > > >> > > > join query. Where should the hints be written? > > > >> > > > > > > >> > > > -- the original query before configuring state TTL > > > >> > > > create temporary view view1 as select .... from my_table_1; > > > >> > > > create temporary view view2 as select .... from my_table_2; > > > >> > > > create temporary view joined_view as > > > >> > > > select view1.*, view2.* from my_view_1 a join my_view_2 b on > > > >> > a.join_key = > > > >> > > > b.join_key; > > > >> > > > > > > >> > > > Option 1: declaring hints at the very beginning of the table > > scan > > > >> > > > > > > >> > > > -- should he or she write hints when declaring the first > > temporary > > > >> > view? > > > >> > > > create temporary view view1 as select .... from my_table_1 > > > >> > > > /*+(OPTIONS('table.exec.state.ttl' > > > >> > > > = 'foo'))*/; > > > >> > > > create temporary view view2 as select .... from my_table_2 > > > >> > > > /*+(OPTIONS('table.exec.state.ttl' > > > >> > > > = 'bar'))*/; > > > >> > > > create temporary view joined_view as > > > >> > > > select view1.*, view2.* from my_view_1 a join my_view_2 b on > > > >> > a.join_key = > > > >> > > > b.join_key; > > > >> > > > > > > >> > > > Option 2: declaring hints when performing the join > > > >> > > > > > > >> > > > -- or should he or she write hints when declaring the join > > > temporary > > > >> > > view? > > > >> > > > create temporary view view1 as select .... from my_table_1; > > > >> > > > create temporary view view2 as select .... from my_table_2; > > > >> > > > create temporary view joined_view as > > > >> > > > select view1.*, view2.* from my_view_1 > > > >> > > /*+(OPTIONS('table.exec.state.ttl' = > > > >> > > > 'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' > = > > > >> > > 'bar'))*/ b > > > >> > > > on a.join_key = b.join_key; > > > >> > > > > > > >> > > > From the user's point of view, does he or she needs to care > > about > > > >> the > > > >> > > > difference between these two kinds of style? Users might think > > the > > > >> two > > > >> > > may > > > >> > > > be equivalent; but in reality, as developers, how do we define > > the > > > >> > range > > > >> > > in > > > >> > > > which hint starts and ends to take effect? > > > >> > > > > > > >> > > > Consider the following two assumptions > > > >> > > > > > > >> > > > 1. Assuming the hint takes effect from the moment it is > declared > > > and > > > >> > > > applies to any subsequent stateful operators until it is > > > overridden > > > >> by > > > >> > a > > > >> > > > new hint. > > > >> > > > If this is the assumption, it's clear that Option 1 and > Option 2 > > > are > > > >> > > > different because a ChangelogNormalize node can appear between > > > scan > > > >> and > > > >> > > > join. Meanwhile, which stream's TTL to apply to the following > > > query > > > >> > after > > > >> > > > the stream join? It is unclear if the user does not explicitly > > set > > > >> it. > > > >> > > > Should the engine make a random decision? > > > >> > > > > > > >> > > > 2. Assuming that the scope of the hint only applies to the > > current > > > >> > query > > > >> > > > block and does not extend to the next operator. > > > >> > > > In this case, the first way of setting the hint will not work > > > >> because > > > >> > it > > > >> > > > cannot be brought to the join operator. Users must choose the > > > second > > > >> > way > > > >> > > to > > > >> > > > configure. Are users willing to remember this strange > constraint > > > on > > > >> SQL > > > >> > > > writing style? Does this indicate a new learning cost? > > > >> > > > > > > >> > > > The example above is used to illustrate that while this > approach > > > may > > > >> > seem > > > >> > > > simple and direct, it actually has many limitations and may > > > produce > > > >> > > > unexpected behavior. Will users still find it attractive? IMO > > > *hints > > > >> > only > > > >> > > > work for a very limited situation where the query is very > > simple, > > > >> and > > > >> > its > > > >> > > > scope is more coarse and not operator-level*. Maybe it > deserves > > > >> another > > > >> > > > FLIP to discuss whether we need a multiple-level state TTL > > > >> > configuration > > > >> > > > mechanism and how to properly implement it. > > > >> > > > > > > >> > > > @Shammon > > > >> > > > > Generally, Flink jobs support two types > > > >> > > > of submission: SQL and jar. If users want to use `TTL on > > Operator` > > > >> for > > > >> > > SQL > > > >> > > > jobs, they need to edit the json file which is not supported > by > > > >> general > > > >> > > job > > > >> > > > submission systems such as flink sql-client, apache kyuubi, > > apache > > > >> > > > streampark and .etc. Users need to download the file and edit > it > > > >> > > manually, > > > >> > > > but they may not have the permissions to the storage system > such > > > as > > > >> > HDFS > > > >> > > in > > > >> > > > a real production environment. From this perspective, I think > it > > > is > > > >> > > > necessary to provide a way similar to > > > >> > > > hits that users can configure the `TTL on Operator` in their > > sqls > > > >> which > > > >> > > > help users to use it conveniently. > > > >> > > > > > > >> > > > IIUC, SQL client supports the statement "EXECUTE PLAN > > > >> > > > 'file:/foo/bar/example.json'". While I think there is not much > > > >> evidence > > > >> > > to > > > >> > > > say we should choose to use hints, just because users cannot > > touch > > > >> > their > > > >> > > > development environment. As a reply to @Shuo, the TTL set > > through > > > >> hint > > > >> > > way > > > >> > > > is not at the operator level. And whether it is really > > > "convenient" > > > >> > needs > > > >> > > > more discussion. > > > >> > > > > > > >> > > > > I agree with @Shuo's idea that for complex cases, users can > > > >> combine > > > >> > > hits > > > >> > > > and `json plan` to configure `TTL on Operator` better. > > > >> > > > > > > >> > > > Suppose users can configure TTL through > > > >> > > > <1> SET 'table.exec.state.ttl' = 'foo'; > > > >> > > > <2> Modify the compiled JSON plan; > > > >> > > > <3> Use hints (personally I'm strongly against this way, but > > let's > > > >> take > > > >> > > it > > > >> > > > into consideration). > > > >> > > > IMO if the user can configure the same parameter in so many > > ways, > > > >> then > > > >> > > the > > > >> > > > complex case only makes things worse. Who has higher priority > > and > > > >> who > > > >> > > > overrides who? > > > >> > > > > > > >> > > > Best, > > > >> > > > Jane > > > >> > > > > > > >> > > > > > > >> > > > On Fri, Mar 24, 2023 at 11:00 AM Shammon FY < > zjur...@gmail.com> > > > >> wrote: > > > >> > > > > > > >> > > > > Hi jane > > > >> > > > > > > > >> > > > > Thanks for initializing this discussion. Configure TTL per > > > >> operator > > > >> > can > > > >> > > > > help users manage state more effectively. > > > >> > > > > > > > >> > > > > I think the `compiled json plan` proposal may need to > consider > > > the > > > >> > > impact > > > >> > > > > on the user's submission workflow. Generally, Flink jobs > > support > > > >> two > > > >> > > > types > > > >> > > > > of submission: SQL and jar. If users want to use `TTL on > > > Operator` > > > >> > for > > > >> > > > SQL > > > >> > > > > jobs, they need to edit the json file which is not supported > > by > > > >> > general > > > >> > > > job > > > >> > > > > submission systems such as flink sql-client, apache kyuubi, > > > apache > > > >> > > > > streampark and .etc. Users need to download the file and > edit > > it > > > >> > > > manually, > > > >> > > > > but they may not have the permissions to the storage system > > such > > > >> as > > > >> > > HDFS > > > >> > > > in > > > >> > > > > a real production environment. > > > >> > > > > > > > >> > > > > From this perspective, I think it is necessary to provide a > > way > > > >> > similar > > > >> > > > to > > > >> > > > > hits that users can configure the `TTL on Operator` in their > > > sqls > > > >> > which > > > >> > > > > help users to use it conveniently. At the same time, I agree > > > with > > > >> > > @Shuo's > > > >> > > > > idea that for complex cases, users can combine hits and > `json > > > >> plan` > > > >> > to > > > >> > > > > configure `TTL on Operator` better. What do you think? > Thanks > > > >> > > > > > > > >> > > > > > > > >> > > > > Best, > > > >> > > > > Shammon FY > > > >> > > > > > > > >> > > > > > > > >> > > > > On Thu, Mar 23, 2023 at 9:58 PM Shuo Cheng < > > njucs...@gmail.com> > > > >> > wrote: > > > >> > > > > > > > >> > > > > > Correction: “users can set 'scan.startup.mode' for kafka > > > >> connector” > > > >> > > -> > > > >> > > > > > “users > > > >> > > > > > can set 'scan.startup.mode' for kafka connector by dynamic > > > table > > > >> > > > option” > > > >> > > > > > > > > >> > > > > > Shuo Cheng <njucs...@gmail.com>于2023年3月23日 周四21:50写道: > > > >> > > > > > > > > >> > > > > > > Hi Jane, > > > >> > > > > > > Thanks for driving this, operator level state ttl is > > > >> absolutely a > > > >> > > > > desired > > > >> > > > > > > feature. I would share my opinion as following: > > > >> > > > > > > > > > >> > > > > > > If the scope of this proposal is limited as an > enhancement > > > for > > > >> > > > compiled > > > >> > > > > > > json plan, it makes sense. I think it does not conflict > > with > > > >> > > > > configuring > > > >> > > > > > > state ttl > > > >> > > > > > > in other ways, e.g., SQL HINT or something else, because > > > they > > > >> > just > > > >> > > > work > > > >> > > > > > in > > > >> > > > > > > different level, SQL Hint works in the exact entrance of > > SQL > > > >> API, > > > >> > > > while > > > >> > > > > > > compiled json plan is the intermediate results for SQL. > > > >> > > > > > > I think the final shape of state ttl configuring may > like > > > the > > > >> > that, > > > >> > > > > users > > > >> > > > > > > can define operator state ttl using SQL HINT > > > (assumption...), > > > >> but > > > >> > > it > > > >> > > > > may > > > >> > > > > > > affects more than one stateful operators inside the same > > > query > > > >> > > block, > > > >> > > > > > then > > > >> > > > > > > users can further configure a specific one by modifying > > the > > > >> > > compiled > > > >> > > > > json > > > >> > > > > > > plan... > > > >> > > > > > > > > > >> > > > > > > In a word, this proposal is in good shape as an > > enhancement > > > >> for > > > >> > > > > compiled > > > >> > > > > > > json plan, and it's orthogonal with other ways like SQL > > Hint > > > >> > which > > > >> > > > > works > > > >> > > > > > in > > > >> > > > > > > a higher level. > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > Nips: > > > >> > > > > > > > > > >> > > > > > > > "From the SQL semantic perspective, hints cannot > > intervene > > > >> in > > > >> > the > > > >> > > > > > > calculation of data results." > > > >> > > > > > > I think it's more properly to say that hint does not > > affect > > > >> the > > > >> > > > > > > equivalence of execution plans (hash agg vs sort agg), > not > > > the > > > >> > > > > > equivalence > > > >> > > > > > > of execution results, e.g., users can set > > > 'scan.startup.mode' > > > >> for > > > >> > > > kafka > > > >> > > > > > > connector, which also "intervene in the calculation of > > data > > > >> > > results". > > > >> > > > > > > > > > >> > > > > > > Sincerely, > > > >> > > > > > > Shuo > > > >> > > > > > > > > > >> > > > > > > On Tue, Mar 21, 2023 at 7:52 PM Jane Chan < > > > >> qingyue....@gmail.com > > > >> > > > > > >> > > > > wrote: > > > >> > > > > > > > > > >> > > > > > >> Hi devs, > > > >> > > > > > >> > > > >> > > > > > >> I'd like to start a discussion on FLIP-292: Support > > > >> configuring > > > >> > > > state > > > >> > > > > > TTL > > > >> > > > > > >> at operator level for Table API & SQL programs [1]. > > > >> > > > > > >> > > > >> > > > > > >> Currently, we only support job-level state TTL > > > configuration > > > >> via > > > >> > > > > > >> 'table.exec.state.ttl'. However, users may expect a > > > >> fine-grained > > > >> > > > state > > > >> > > > > > TTL > > > >> > > > > > >> control to optimize state usage. Hence we propose to > > > >> > > > > > serialize/deserialize > > > >> > > > > > >> the state TTL as metadata of the operator's state > to/from > > > the > > > >> > > > compiled > > > >> > > > > > >> JSON > > > >> > > > > > >> plan, to achieve the goal that specifying different > state > > > TTL > > > >> > when > > > >> > > > > > >> transforming the exec node to stateful operators. > > > >> > > > > > >> > > > >> > > > > > >> Look forward to your opinions! > > > >> > > > > > >> > > > >> > > > > > >> [1] > > > >> > > > > > >> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951 > > > >> > > > > > >> > > > >> > > > > > >> Best Regards, > > > >> > > > > > >> Jane Chan > > > >> > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >