Hi, devs, Thanks for your valuable feedback. I've changed the title of the FLIP to "Enhance COMPILED PLAN to support operator-level state TTL configuration" and added an explanation for StateMetadata. If you have any concerns, please let me know.
Best regards, Jane On Mon, Apr 3, 2023 at 11:42 PM Jane Chan <qingyue....@gmail.com> wrote: > Hi Timo, > > Thanks for your valuable feedback. Let me explain the design details. > > > However, actually fine-grained state TTL should already be possible > today. I don't fully understand where your proposed StateMetadata is > located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or > TwoInputStreamOperator? > > Currently, all ExecNodes that support JSON SerDe are annotated with > @ExecNoteMetadata. This annotation interface has a key called > consumedOptions, which persists all configuration that affects the > topology. For ExecNodes that translate to OneInputStreamOperator, adding > "table.exec.state.ttl" to consumedOptions is enough to achieve the goal of > configuring TTL with fine granularity. However, this is not a generalized > solution for ExecNodes that translate to TwoInputStreamOperator or > MultipleInputStreamOperator. Because we may need to set different TTLs for > the left / right (or k-th) input stream, but we do not want to introduce > configurations like "table.exec.left-state.ttl" or > "table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl". > > The proposed StateMetadata will be the member variable of ExecNodes that > translates to stateful operators, similar to inputProperties (which is > shared by all ExecNodes, though). > I'd like to illustrate this in the following snippet of code for > StreamExecJoin. > > @ExecNodeMetadata( > name = "stream-exec-join", > version = 1, > producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION, > minPlanVersion = FlinkVersion.v1_15, > minStateVersion = FlinkVersion.v1_15) > > +@ExecNodeMetadata( > + name = "stream-exec-join", > + version = 2, > + producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION, > + minPlanVersion = FlinkVersion.v1_18, > + minStateVersion = FlinkVersion.v1_15) > public class StreamExecJoin extends ExecNodeBase<RowData> > implements StreamExecNode<RowData>, SingleTransformationTranslator< > RowData> { > > > + @Nullable > + @JsonProperty(FIELD_NAME_STATE) > + private final List<StateMetadata> stateMetadataList; > > public StreamExecJoin( > ReadableConfig tableConfig, > JoinSpec joinSpec, > List<int[]> leftUpsertKeys, > List<int[]> rightUpsertKeys, > InputProperty leftInputProperty, > InputProperty rightInputProperty, > RowType outputType, > String description) { > this( > ExecNodeContext.newNodeId(), > ExecNodeContext.newContext(StreamExecJoin.class), > ExecNodeContext.newPersistedConfig(StreamExecJoin.class, > tableConfig), > joinSpec, > leftUpsertKeys, > rightUpsertKeys, > Lists.newArrayList(leftInputProperty, rightInputProperty), > outputType, > description, > + StateMetadata.multiInputDefaultMeta(tableConfig, > LEFT_STATE_NAME, RIGHT_STATE_NAME)); > } > > @JsonCreator > public StreamExecJoin( > @JsonProperty(FIELD_NAME_ID) int id, > @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, > @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig > persistedConfig, > @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec, > @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> > leftUpsertKeys, > @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> > rightUpsertKeys, > @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> > inputProperties, > @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, > @JsonProperty(FIELD_NAME_DESCRIPTION) String description, > > + @Nullable > + @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> > stateMetadataList) { > super(id, context, persistedConfig, inputProperties, outputType, > description); > checkArgument(inputProperties.size() == 2); > this.joinSpec = checkNotNull(joinSpec); > this.leftUpsertKeys = leftUpsertKeys; > this.rightUpsertKeys = rightUpsertKeys; > + this.stateMetadataList = stateMetadataList; > } > @Override > @SuppressWarnings("unchecked") > protected Transformation<RowData> translateToPlanInternal( > PlannerBase planner, ExecNodeConfig config) { > final ExecEdge leftInputEdge = …; > final ExecEdge rightInputEdge = …; > . . . > > // for backward compatibility > long leftStateRetentionTime = > isNullOrEmpty(stateMetadataList) > ? config.getStateRetentionTime() > : stateMetadataList.get(0).getStateTtl(); > long rightStateRetentionTime = > isNullOrEmpty(stateMetadataList) > ? leftStateRetentionTime > : stateMetadataList.get(1).getStateTtl(); > > AbstractStreamingJoinOperator operator; > FlinkJoinType joinType = joinSpec.getJoinType(); > if (joinType == FlinkJoinType.ANTI || joinType == > FlinkJoinType.SEMI) { > operator = > new StreamingSemiAntiJoinOperator( > joinType == FlinkJoinType.ANTI, > leftTypeInfo, > rightTypeInfo, > generatedCondition, > leftInputSpec, > rightInputSpec, > joinSpec.getFilterNulls(), > leftStateRetentionTime, > + rightStateRetentionTime); > } else { > > operator = > new StreamingJoinOperator( > leftTypeInfo, > rightTypeInfo, > generatedCondition, > leftInputSpec, > rightInputSpec, > leftIsOuter, > rightIsOuter, > joinSpec.getFilterNulls(), > leftStateRetentionTime, > + rightStateRetentionTime); > } > > . . . > return transform; > } > } > > > > > Best regards, > Jane > > On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <twal...@apache.org> wrote: > >> Hi Jane, >> >> thanks for proposing this FLIP. More state insights and fine-grained >> state TTL are a frequently requested feature for Flink SQL. Eventually, >> we need to address this. >> >> I agree with the previous responses that doing this with a hint might >> cause more confusion than it actually helps. We should use hints only if >> they can be placed close to an operation (e.g. JOIN or table). And only >> where a global flag for the entire query is not sufficient using SET. >> >> In general, I support the current direction of the FLIP and continuing >> the vision of FLIP-190. However, actually fine-grained state TTL should >> already be possible today. Maybe this is untested yet, but we largely >> reworked how configuration works within the planner in Flink 1.15. >> >> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already combines >> configuration coming from TableConfig with per-ExecNode config. >> Actually, state TTL from JSON plan should already have higher precedence >> than TableConfig. >> >> It would be great to extend the meta-information of ExecNodes with state >> insights. I don't fully understand where your proposed StateMetadata is >> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or >> TwoInputStreamOperator? >> >> I think it should be a combination of ExecNodeMetadata with rough >> estimates (declaration) or StreamExecNode. But should not bubble into >> TwoInputStreamOperator. >> >> Regards, >> Timo >> >> [1] >> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java >> >> On 03.04.23 09:15, godfrey he wrote: >> > Hi Jane, >> > >> > Thanks for driving this FLIP. >> > >> > I think the compiled plan solution and the hint solution do not >> > conflict, the two can exist at the same time. >> > The compiled plan solution can address the need of advanced users and >> > the platform users >> > which all stateful operators' state TTL can be defined by user. While >> > the hint solution can address some >> > specific simple scenarios, which is very user-friendly, convenient, >> > and unambiguous to use. >> > >> > Some stateful operators are not compiled from SQL directly, such as >> > ChangelogNormalize and >> > SinkUpsertMaterializer mentioned above, I notice the the example given >> by Yisha >> > has hints propagation problem which does not conform to the current >> design. >> > The rough idea about the hint solution should be simple (only the >> > common operators are supported) >> > and easy to understand (no hints propagation). >> > >> > If the hint solution is supported, a compiled plan which is from a >> > query with state TTL hints >> > can also be further modified for the state TTL parts. >> > >> > So, I prefer the hint solution to be discuss in a separate FLIP. I >> > think that FLIP maybe >> > need a lot discussion. >> > >> > Best, >> > Godfrey >> > >> > 周伊莎 <zhouyi...@bytedance.com.invalid> 于2023年3月30日周四 22:04写道: >> >> >> >> Hi Jane, >> >> >> >> Thanks for your detailed response. >> >> >> >> You mentioned that there are 10k+ SQL jobs in your production >> >>> environment, but only ~100 jobs' migration involves plan editing. Is >> 10k+ >> >>> the number of total jobs, or the number of jobs that use stateful >> >>> computation and need state migration? >> >>> >> >> >> >> 10k is the number of SQL jobs that enable periodic checkpoint. And >> >> surely if users change their sql which result in changes of the plan, >> they >> >> need to do state migration. >> >> >> >> - You mentioned that "A truth that can not be ignored is that users >> >>> usually tend to give up editing TTL(or operator ID in our case) >> instead of >> >>> migrating this configuration between their versions of one given >> job." So >> >>> what would users prefer to do if they're reluctant to edit the >> operator >> >>> ID? Would they submit the same SQL as a new job with a higher version >> to >> >>> re-accumulating the state from the earliest offset? >> >> >> >> >> >> You're exactly right. People will tend to re-accumulate the state from >> a >> >> given offset by changing the namespace of their checkpoint. >> >> Namespace is an internal concept and restarting the sql job in a new >> >> namespace can be simply understood as submitting a new job. >> >> >> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the >> >>> following syntax to perform plan migration >> >> >> >> >> >> The 'plan migration' I said in my last reply may be inaccurate. It's >> more >> >> like 'query evolution'. In other word, if a user submitted a sql job >> with a >> >> configured compiled plan, and then >> >> he changes the sql, the compiled plan changes too, how to move the >> >> configuration in the old plan to the new plan. >> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and >> leave out >> >> the 'query evolution' which is a fundamental change to the query. E.g. >> >> adding a filter condition, a different aggregation. >> >> And I'm really looking forward to a solution for query evolution. >> >> >> >> And I'm also curious about how to use the hint >> >>> approach to cover cases like >> >>> >> >>> - configuring TTL for operators like ChangelogNormalize, >> >>> SinkUpsertMaterializer, etc., these operators are derived by the >> planner >> >>> implicitly >> >>> - cope with two/multiple input stream operator's state TTL, like join, >> >>> and other operations like row_number, rank, correlate, etc. >> >> >> >> >> >> Actually, in our company , we make operators in the query block >> where the >> >> hint locates all affected by that hint. For example, >> >> >> >> INSERT INTO sink >> >>> SELECT /*+ STATE_TTL('1D') */ >> >>> id, >> >>> name, >> >>> num >> >>> FROM ( >> >>> SELECT >> >>> *, >> >>> ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as >> row_num >> >>> FROM ( >> >>> SELECT >> >>> * >> >>> FROM ( >> >>> SELECT >> >>> id, >> >>> name, >> >>> max(num) as num >> >>> FROM source1 >> >>> GROUP BY >> >>> id, name, TUMBLE(proc, INTERVAL '1' MINUTE) >> >>> ) >> >>> GROUP BY >> >>> id, name, num >> >>> ) >> >>> ) >> >>> WHERE row_num = 1 >> >>> >> >> >> >> In the SQL above, the state TTL of Rank and Agg will be all configured >> as 1 >> >> day. If users want to set different TTL for Rank and Agg, they can >> just >> >> make these two queries located in two different query blocks. >> >> It looks quite rough but straightforward enough. For each side of join >> >> operator, one of my users proposed a syntax like below: >> >> >> >>> /*+ >> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000') >> */ >> >>> >> >>> We haven't accepted this proposal now, maybe we could find some better >> >> design for this kind of case. Just for your information. >> >> >> >> I think if we want to utilize hints to support fine-grained >> configuration, >> >> we can open a new FLIP to discuss it. >> >> BTW, personally, I'm interested in how to design a graphical interface >> to >> >> help users to maintain their custom fine-grained configuration between >> >> their job versions. >> >> >> >> Best regards, >> >> Yisha >> > >> >>