Hi, devs, Thanks for all the feedback.
Based on the discussion [1], we seem to have a consensus so far, so I would like to start a vote on FLIP-292 [2], which begins on the following Monday (Apr. 10th at 10:00 AM GMT). If you have any questions or concerns, please don't hesitate to follow up on this discussion. [1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951&show-miniview Best regards, Jane On Thu, Apr 6, 2023 at 10:33 AM Jane Chan <qingyue....@gmail.com> wrote: > Hi, devs, > > Thanks for your valuable feedback. I've changed the title of the FLIP to > "Enhance COMPILED PLAN to support operator-level state TTL configuration" > and added an explanation for StateMetadata. If you have any concerns, > please let me know. > > Best regards, > Jane > > On Mon, Apr 3, 2023 at 11:42 PM Jane Chan <qingyue....@gmail.com> wrote: > >> Hi Timo, >> >> Thanks for your valuable feedback. Let me explain the design details. >> >> > However, actually fine-grained state TTL should already be possible >> today. I don't fully understand where your proposed StateMetadata is >> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or >> TwoInputStreamOperator? >> >> Currently, all ExecNodes that support JSON SerDe are annotated with >> @ExecNoteMetadata. This annotation interface has a key called >> consumedOptions, which persists all configuration that affects the >> topology. For ExecNodes that translate to OneInputStreamOperator, adding >> "table.exec.state.ttl" to consumedOptions is enough to achieve the goal of >> configuring TTL with fine granularity. However, this is not a generalized >> solution for ExecNodes that translate to TwoInputStreamOperator or >> MultipleInputStreamOperator. Because we may need to set different TTLs for >> the left / right (or k-th) input stream, but we do not want to introduce >> configurations like "table.exec.left-state.ttl" or >> "table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl". >> >> The proposed StateMetadata will be the member variable of ExecNodes that >> translates to stateful operators, similar to inputProperties (which is >> shared by all ExecNodes, though). >> I'd like to illustrate this in the following snippet of code for >> StreamExecJoin. >> >> @ExecNodeMetadata( >> name = "stream-exec-join", >> version = 1, >> producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION, >> minPlanVersion = FlinkVersion.v1_15, >> minStateVersion = FlinkVersion.v1_15) >> >> +@ExecNodeMetadata( >> + name = "stream-exec-join", >> + version = 2, >> + producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION, >> + minPlanVersion = FlinkVersion.v1_18, >> + minStateVersion = FlinkVersion.v1_15) >> public class StreamExecJoin extends ExecNodeBase<RowData> >> implements StreamExecNode<RowData>, SingleTransformationTranslator< >> RowData> { >> >> >> + @Nullable >> + @JsonProperty(FIELD_NAME_STATE) >> + private final List<StateMetadata> stateMetadataList; >> >> public StreamExecJoin( >> ReadableConfig tableConfig, >> JoinSpec joinSpec, >> List<int[]> leftUpsertKeys, >> List<int[]> rightUpsertKeys, >> InputProperty leftInputProperty, >> InputProperty rightInputProperty, >> RowType outputType, >> String description) { >> this( >> ExecNodeContext.newNodeId(), >> ExecNodeContext.newContext(StreamExecJoin.class), >> ExecNodeContext.newPersistedConfig(StreamExecJoin.class, >> tableConfig), >> joinSpec, >> leftUpsertKeys, >> rightUpsertKeys, >> Lists.newArrayList(leftInputProperty, rightInputProperty), >> outputType, >> description, >> + StateMetadata.multiInputDefaultMeta(tableConfig, >> LEFT_STATE_NAME, RIGHT_STATE_NAME)); >> } >> >> @JsonCreator >> public StreamExecJoin( >> @JsonProperty(FIELD_NAME_ID) int id, >> @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, >> @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig >> persistedConfig, >> @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec, >> @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> >> leftUpsertKeys, >> @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> >> rightUpsertKeys, >> @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> >> inputProperties, >> @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, >> @JsonProperty(FIELD_NAME_DESCRIPTION) String description, >> >> + @Nullable >> + @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> >> stateMetadataList) { >> super(id, context, persistedConfig, inputProperties, outputType, >> description); >> checkArgument(inputProperties.size() == 2); >> this.joinSpec = checkNotNull(joinSpec); >> this.leftUpsertKeys = leftUpsertKeys; >> this.rightUpsertKeys = rightUpsertKeys; >> + this.stateMetadataList = stateMetadataList; >> } >> @Override >> @SuppressWarnings("unchecked") >> protected Transformation<RowData> translateToPlanInternal( >> PlannerBase planner, ExecNodeConfig config) { >> final ExecEdge leftInputEdge = …; >> final ExecEdge rightInputEdge = …; >> . . . >> >> // for backward compatibility >> long leftStateRetentionTime = >> isNullOrEmpty(stateMetadataList) >> ? config.getStateRetentionTime() >> : stateMetadataList.get(0).getStateTtl(); >> long rightStateRetentionTime = >> isNullOrEmpty(stateMetadataList) >> ? leftStateRetentionTime >> : stateMetadataList.get(1).getStateTtl(); >> >> AbstractStreamingJoinOperator operator; >> FlinkJoinType joinType = joinSpec.getJoinType(); >> if (joinType == FlinkJoinType.ANTI || joinType == >> FlinkJoinType.SEMI) { >> operator = >> new StreamingSemiAntiJoinOperator( >> joinType == FlinkJoinType.ANTI, >> leftTypeInfo, >> rightTypeInfo, >> generatedCondition, >> leftInputSpec, >> rightInputSpec, >> joinSpec.getFilterNulls(), >> leftStateRetentionTime, >> + rightStateRetentionTime); >> } else { >> >> operator = >> new StreamingJoinOperator( >> leftTypeInfo, >> rightTypeInfo, >> generatedCondition, >> leftInputSpec, >> rightInputSpec, >> leftIsOuter, >> rightIsOuter, >> joinSpec.getFilterNulls(), >> leftStateRetentionTime, >> + rightStateRetentionTime); >> } >> >> . . . >> return transform; >> } >> } >> >> >> >> >> Best regards, >> Jane >> >> On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Jane, >>> >>> thanks for proposing this FLIP. More state insights and fine-grained >>> state TTL are a frequently requested feature for Flink SQL. Eventually, >>> we need to address this. >>> >>> I agree with the previous responses that doing this with a hint might >>> cause more confusion than it actually helps. We should use hints only if >>> they can be placed close to an operation (e.g. JOIN or table). And only >>> where a global flag for the entire query is not sufficient using SET. >>> >>> In general, I support the current direction of the FLIP and continuing >>> the vision of FLIP-190. However, actually fine-grained state TTL should >>> already be possible today. Maybe this is untested yet, but we largely >>> reworked how configuration works within the planner in Flink 1.15. >>> >>> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already combines >>> configuration coming from TableConfig with per-ExecNode config. >>> Actually, state TTL from JSON plan should already have higher precedence >>> than TableConfig. >>> >>> It would be great to extend the meta-information of ExecNodes with state >>> insights. I don't fully understand where your proposed StateMetadata is >>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or >>> TwoInputStreamOperator? >>> >>> I think it should be a combination of ExecNodeMetadata with rough >>> estimates (declaration) or StreamExecNode. But should not bubble into >>> TwoInputStreamOperator. >>> >>> Regards, >>> Timo >>> >>> [1] >>> >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java >>> >>> On 03.04.23 09:15, godfrey he wrote: >>> > Hi Jane, >>> > >>> > Thanks for driving this FLIP. >>> > >>> > I think the compiled plan solution and the hint solution do not >>> > conflict, the two can exist at the same time. >>> > The compiled plan solution can address the need of advanced users and >>> > the platform users >>> > which all stateful operators' state TTL can be defined by user. While >>> > the hint solution can address some >>> > specific simple scenarios, which is very user-friendly, convenient, >>> > and unambiguous to use. >>> > >>> > Some stateful operators are not compiled from SQL directly, such as >>> > ChangelogNormalize and >>> > SinkUpsertMaterializer mentioned above, I notice the the example >>> given by Yisha >>> > has hints propagation problem which does not conform to the current >>> design. >>> > The rough idea about the hint solution should be simple (only the >>> > common operators are supported) >>> > and easy to understand (no hints propagation). >>> > >>> > If the hint solution is supported, a compiled plan which is from a >>> > query with state TTL hints >>> > can also be further modified for the state TTL parts. >>> > >>> > So, I prefer the hint solution to be discuss in a separate FLIP. I >>> > think that FLIP maybe >>> > need a lot discussion. >>> > >>> > Best, >>> > Godfrey >>> > >>> > 周伊莎 <zhouyi...@bytedance.com.invalid> 于2023年3月30日周四 22:04写道: >>> >> >>> >> Hi Jane, >>> >> >>> >> Thanks for your detailed response. >>> >> >>> >> You mentioned that there are 10k+ SQL jobs in your production >>> >>> environment, but only ~100 jobs' migration involves plan editing. Is >>> 10k+ >>> >>> the number of total jobs, or the number of jobs that use stateful >>> >>> computation and need state migration? >>> >>> >>> >> >>> >> 10k is the number of SQL jobs that enable periodic checkpoint. And >>> >> surely if users change their sql which result in changes of the plan, >>> they >>> >> need to do state migration. >>> >> >>> >> - You mentioned that "A truth that can not be ignored is that users >>> >>> usually tend to give up editing TTL(or operator ID in our case) >>> instead of >>> >>> migrating this configuration between their versions of one given >>> job." So >>> >>> what would users prefer to do if they're reluctant to edit the >>> operator >>> >>> ID? Would they submit the same SQL as a new job with a higher >>> version to >>> >>> re-accumulating the state from the earliest offset? >>> >> >>> >> >>> >> You're exactly right. People will tend to re-accumulate the state >>> from a >>> >> given offset by changing the namespace of their checkpoint. >>> >> Namespace is an internal concept and restarting the sql job in a new >>> >> namespace can be simply understood as submitting a new job. >>> >> >>> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the >>> >>> following syntax to perform plan migration >>> >> >>> >> >>> >> The 'plan migration' I said in my last reply may be inaccurate. >>> It's more >>> >> like 'query evolution'. In other word, if a user submitted a sql job >>> with a >>> >> configured compiled plan, and then >>> >> he changes the sql, the compiled plan changes too, how to move the >>> >> configuration in the old plan to the new plan. >>> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and >>> leave out >>> >> the 'query evolution' which is a fundamental change to the query. E.g. >>> >> adding a filter condition, a different aggregation. >>> >> And I'm really looking forward to a solution for query evolution. >>> >> >>> >> And I'm also curious about how to use the hint >>> >>> approach to cover cases like >>> >>> >>> >>> - configuring TTL for operators like ChangelogNormalize, >>> >>> SinkUpsertMaterializer, etc., these operators are derived by the >>> planner >>> >>> implicitly >>> >>> - cope with two/multiple input stream operator's state TTL, like >>> join, >>> >>> and other operations like row_number, rank, correlate, etc. >>> >> >>> >> >>> >> Actually, in our company , we make operators in the query block >>> where the >>> >> hint locates all affected by that hint. For example, >>> >> >>> >> INSERT INTO sink >>> >>> SELECT /*+ STATE_TTL('1D') */ >>> >>> id, >>> >>> name, >>> >>> num >>> >>> FROM ( >>> >>> SELECT >>> >>> *, >>> >>> ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as >>> row_num >>> >>> FROM ( >>> >>> SELECT >>> >>> * >>> >>> FROM ( >>> >>> SELECT >>> >>> id, >>> >>> name, >>> >>> max(num) as num >>> >>> FROM source1 >>> >>> GROUP BY >>> >>> id, name, TUMBLE(proc, INTERVAL '1' MINUTE) >>> >>> ) >>> >>> GROUP BY >>> >>> id, name, num >>> >>> ) >>> >>> ) >>> >>> WHERE row_num = 1 >>> >>> >>> >> >>> >> In the SQL above, the state TTL of Rank and Agg will be all >>> configured as 1 >>> >> day. If users want to set different TTL for Rank and Agg, they can >>> just >>> >> make these two queries located in two different query blocks. >>> >> It looks quite rough but straightforward enough. For each side of >>> join >>> >> operator, one of my users proposed a syntax like below: >>> >> >>> >>> /*+ >>> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000') >>> */ >>> >>> >>> >>> We haven't accepted this proposal now, maybe we could find some >>> better >>> >> design for this kind of case. Just for your information. >>> >> >>> >> I think if we want to utilize hints to support fine-grained >>> configuration, >>> >> we can open a new FLIP to discuss it. >>> >> BTW, personally, I'm interested in how to design a graphical >>> interface to >>> >> help users to maintain their custom fine-grained configuration between >>> >> their job versions. >>> >> >>> >> Best regards, >>> >> Yisha >>> > >>> >>>