Hi, devs,

Thanks for all the feedback.

Based on the discussion [1], we seem to have a consensus so far, so I would
like to start a vote on FLIP-292 [2], which begins on the following Monday
(Apr. 10th at 10:00 AM GMT).

If you have any questions or concerns, please don't hesitate to follow up
on this discussion.

[1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951&show-miniview

Best regards,
Jane

On Thu, Apr 6, 2023 at 10:33 AM Jane Chan <qingyue....@gmail.com> wrote:

> Hi, devs,
>
> Thanks for your valuable feedback. I've changed the title of the FLIP to
> "Enhance COMPILED PLAN to support operator-level state TTL configuration"
> and added an explanation for StateMetadata. If you have any concerns,
> please let me know.
>
> Best regards,
> Jane
>
> On Mon, Apr 3, 2023 at 11:42 PM Jane Chan <qingyue....@gmail.com> wrote:
>
>> Hi Timo,
>>
>> Thanks for your valuable feedback. Let me explain the design details.
>>
>> > However, actually fine-grained state TTL should already be possible
>> today. I don't fully understand where your proposed StateMetadata is
>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
>> TwoInputStreamOperator?
>>
>> Currently, all ExecNodes that support JSON SerDe are annotated with
>> @ExecNoteMetadata. This annotation interface has a key called
>> consumedOptions, which persists all configuration that affects the
>> topology. For ExecNodes that translate to OneInputStreamOperator, adding
>> "table.exec.state.ttl" to consumedOptions is enough to achieve the goal of
>> configuring TTL with fine granularity. However, this is not a generalized
>> solution for ExecNodes that translate to TwoInputStreamOperator or
>> MultipleInputStreamOperator. Because we may need to set different TTLs for
>> the left / right (or k-th) input stream, but we do not want to introduce
>> configurations like "table.exec.left-state.ttl" or
>> "table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl".
>>
>> The proposed StateMetadata will be the member variable of ExecNodes that
>> translates to stateful operators, similar to inputProperties (which is
>> shared by all ExecNodes, though).
>> I'd like to illustrate this in the following snippet of code for
>> StreamExecJoin.
>>
>> @ExecNodeMetadata(
>>       name = "stream-exec-join",
>>       version = 1,
>>       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
>>       minPlanVersion = FlinkVersion.v1_15,
>>       minStateVersion = FlinkVersion.v1_15)
>>
>> +@ExecNodeMetadata(
>> +       name = "stream-exec-join",
>> +       version = 2,
>> +       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
>> +       minPlanVersion = FlinkVersion.v1_18,
>> +       minStateVersion = FlinkVersion.v1_15)
>> public class StreamExecJoin extends ExecNodeBase<RowData>
>>       implements StreamExecNode<RowData>, SingleTransformationTranslator<
>> RowData> {
>>
>>
>> +   @Nullable
>> +   @JsonProperty(FIELD_NAME_STATE)
>> +   private final List<StateMetadata> stateMetadataList;
>>
>>   public StreamExecJoin(
>>           ReadableConfig tableConfig,
>>           JoinSpec joinSpec,
>>           List<int[]> leftUpsertKeys,
>>           List<int[]> rightUpsertKeys,
>>           InputProperty leftInputProperty,
>>           InputProperty rightInputProperty,
>>           RowType outputType,
>>           String description) {
>>       this(
>>               ExecNodeContext.newNodeId(),
>>               ExecNodeContext.newContext(StreamExecJoin.class),
>>               ExecNodeContext.newPersistedConfig(StreamExecJoin.class,
>> tableConfig),
>>               joinSpec,
>>               leftUpsertKeys,
>>               rightUpsertKeys,
>>               Lists.newArrayList(leftInputProperty, rightInputProperty),
>>               outputType,
>>               description,
>> +              StateMetadata.multiInputDefaultMeta(tableConfig,
>> LEFT_STATE_NAME, RIGHT_STATE_NAME));
>>   }
>>
>>   @JsonCreator
>>   public StreamExecJoin(
>>           @JsonProperty(FIELD_NAME_ID) int id,
>>           @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
>>           @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
>> persistedConfig,
>>           @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
>>           @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]>
>> leftUpsertKeys,
>>           @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]>
>> rightUpsertKeys,
>>           @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
>> inputProperties,
>>           @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
>>           @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
>>
>> +          @Nullable
>> +          @JsonProperty(FIELD_NAME_STATE) List<StateMetadata>
>> stateMetadataList) {
>>       super(id, context, persistedConfig, inputProperties, outputType,
>> description);
>>       checkArgument(inputProperties.size() == 2);
>>       this.joinSpec = checkNotNull(joinSpec);
>>       this.leftUpsertKeys = leftUpsertKeys;
>>       this.rightUpsertKeys = rightUpsertKeys;
>> +      this.stateMetadataList = stateMetadataList;
>>   }
>> @Override
>>   @SuppressWarnings("unchecked")
>>   protected Transformation<RowData> translateToPlanInternal(
>>           PlannerBase planner, ExecNodeConfig config) {
>>       final ExecEdge leftInputEdge = …;
>>       final ExecEdge rightInputEdge = …;
>>       . . .
>>
>>        // for backward compatibility
>>       long leftStateRetentionTime =
>>               isNullOrEmpty(stateMetadataList)
>>                       ? config.getStateRetentionTime()
>>                       : stateMetadataList.get(0).getStateTtl();
>>       long rightStateRetentionTime =
>>               isNullOrEmpty(stateMetadataList)
>>                       ? leftStateRetentionTime
>>                       : stateMetadataList.get(1).getStateTtl();
>>
>>       AbstractStreamingJoinOperator operator;
>>       FlinkJoinType joinType = joinSpec.getJoinType();
>>       if (joinType == FlinkJoinType.ANTI || joinType ==
>> FlinkJoinType.SEMI) {
>>           operator =
>>                   new StreamingSemiAntiJoinOperator(
>>                           joinType == FlinkJoinType.ANTI,
>>                           leftTypeInfo,
>>                           rightTypeInfo,
>>                           generatedCondition,
>>                           leftInputSpec,
>>                           rightInputSpec,
>>                           joinSpec.getFilterNulls(),
>>                           leftStateRetentionTime,
>> +                          rightStateRetentionTime);
>>       } else {
>>
>>            operator =
>>                   new StreamingJoinOperator(
>>                           leftTypeInfo,
>>                           rightTypeInfo,
>>                           generatedCondition,
>>                           leftInputSpec,
>>                           rightInputSpec,
>>                           leftIsOuter,
>>                           rightIsOuter,
>>                           joinSpec.getFilterNulls(),
>>                           leftStateRetentionTime,
>> +                          rightStateRetentionTime);
>>       }
>>
>>       . . .
>>       return transform;
>>   }
>> }
>>
>>
>>
>>
>> Best regards,
>> Jane
>>
>> On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Jane,
>>>
>>> thanks for proposing this FLIP. More state insights and fine-grained
>>> state TTL are a frequently requested feature for Flink SQL. Eventually,
>>> we need to address this.
>>>
>>> I agree with the previous responses that doing this with a hint might
>>> cause more confusion than it actually helps. We should use hints only if
>>> they can be placed close to an operation (e.g. JOIN or table). And only
>>> where a global flag for the entire query is not sufficient using SET.
>>>
>>> In general, I support the current direction of the FLIP and continuing
>>> the vision of FLIP-190. However, actually fine-grained state TTL should
>>> already be possible today. Maybe this is untested yet, but we largely
>>> reworked how configuration works within the planner in Flink 1.15.
>>>
>>> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already combines
>>> configuration coming from TableConfig with per-ExecNode config.
>>> Actually, state TTL from JSON plan should already have higher precedence
>>> than TableConfig.
>>>
>>> It would be great to extend the meta-information of ExecNodes with state
>>> insights. I don't fully understand where your proposed StateMetadata is
>>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
>>> TwoInputStreamOperator?
>>>
>>> I think it should be a combination of ExecNodeMetadata with rough
>>> estimates (declaration) or StreamExecNode. But should not bubble into
>>> TwoInputStreamOperator.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>>
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
>>>
>>> On 03.04.23 09:15, godfrey he wrote:
>>> > Hi Jane,
>>> >
>>> > Thanks for driving this FLIP.
>>> >
>>> > I think the compiled plan solution and the hint solution do not
>>> > conflict, the two can exist at the same time.
>>> > The compiled plan solution can address the need of advanced users and
>>> > the platform users
>>> > which all stateful operators' state TTL can be defined by user. While
>>> > the hint solution can address some
>>> >   specific simple scenarios, which is very user-friendly, convenient,
>>> > and unambiguous to use.
>>> >
>>> > Some stateful operators are not compiled from SQL directly, such as
>>> > ChangelogNormalize and
>>> > SinkUpsertMaterializer mentioned above,  I notice the the example
>>> given by Yisha
>>> > has hints propagation problem which does not conform to the current
>>> design.
>>> > The rough idea about the hint solution should be simple (only the
>>> > common operators are supported)
>>> > and easy to understand (no hints propagation).
>>> >
>>> > If the hint solution is supported, a compiled plan which is from a
>>> > query with state TTL hints
>>> >   can also be further modified for the state TTL parts.
>>> >
>>> > So, I prefer the hint solution to be discuss in a separate FLIP.  I
>>> > think that FLIP maybe
>>> > need a lot discussion.
>>> >
>>> > Best,
>>> > Godfrey
>>> >
>>> > 周伊莎 <zhouyi...@bytedance.com.invalid> 于2023年3月30日周四 22:04写道:
>>> >>
>>> >> Hi Jane,
>>> >>
>>> >> Thanks for your detailed response.
>>> >>
>>> >> You mentioned that there are 10k+ SQL jobs in your production
>>> >>> environment, but only ~100 jobs' migration involves plan editing. Is
>>> 10k+
>>> >>> the number of total jobs, or the number of jobs that use stateful
>>> >>> computation and need state migration?
>>> >>>
>>> >>
>>> >> 10k is the number of SQL jobs that enable periodic checkpoint. And
>>> >> surely if users change their sql which result in changes of the plan,
>>> they
>>> >> need to do state migration.
>>> >>
>>> >> - You mentioned that "A truth that can not be ignored is that users
>>> >>> usually tend to give up editing TTL(or operator ID in our case)
>>> instead of
>>> >>> migrating this configuration between their versions of one given
>>> job." So
>>> >>> what would users prefer to do if they're reluctant to edit the
>>> operator
>>> >>> ID? Would they submit the same SQL as a new job with a higher
>>> version to
>>> >>> re-accumulating the state from the earliest offset?
>>> >>
>>> >>
>>> >> You're exactly right. People will tend to re-accumulate the state
>>> from a
>>> >> given offset by changing the namespace of their checkpoint.
>>> >> Namespace is an internal concept and restarting the sql job in a new
>>> >> namespace can be simply understood as submitting a new job.
>>> >>
>>> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the
>>> >>> following syntax to perform plan migration
>>> >>
>>> >>
>>> >> The 'plan migration'  I said in my last reply may be inaccurate.
>>> It's more
>>> >> like 'query evolution'. In other word, if a user submitted a sql job
>>> with a
>>> >> configured compiled plan, and then
>>> >> he changes the sql,  the compiled plan changes too, how to move the
>>> >> configuration in the old plan to the new plan.
>>> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and
>>> leave out
>>> >> the 'query evolution' which is a fundamental change to the query. E.g.
>>> >> adding a filter condition, a different aggregation.
>>> >> And I'm really looking forward to a solution for query evolution.
>>> >>
>>> >> And I'm also curious about how to use the hint
>>> >>> approach to cover cases like
>>> >>>
>>> >>> - configuring TTL for operators like ChangelogNormalize,
>>> >>> SinkUpsertMaterializer, etc., these operators are derived by the
>>> planner
>>> >>> implicitly
>>> >>> - cope with two/multiple input stream operator's state TTL, like
>>> join,
>>> >>> and other operations like row_number, rank, correlate, etc.
>>> >>
>>> >>
>>> >>   Actually, in our company , we make operators in the query block
>>> where the
>>> >> hint locates all affected by that hint. For example,
>>> >>
>>> >> INSERT INTO sink
>>> >>> SELECT /*+ STATE_TTL('1D') */
>>> >>>     id,
>>> >>>     name,
>>> >>>     num
>>> >>> FROM (
>>> >>>     SELECT
>>> >>>         *,
>>> >>>         ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as
>>> row_num
>>> >>>     FROM (
>>> >>>         SELECT
>>> >>>             *
>>> >>>         FROM (
>>> >>>             SELECT
>>> >>>                 id,
>>> >>>                 name,
>>> >>>                 max(num) as num
>>> >>>             FROM source1
>>> >>>             GROUP BY
>>> >>>                 id, name, TUMBLE(proc, INTERVAL '1' MINUTE)
>>> >>>         )
>>> >>>         GROUP BY
>>> >>>             id, name, num
>>> >>>     )
>>> >>> )
>>> >>> WHERE row_num = 1
>>> >>>
>>> >>
>>> >> In the SQL above, the state TTL of Rank and Agg will be all
>>> configured as 1
>>> >> day.  If users want to set different TTL for Rank and Agg, they can
>>> just
>>> >> make these two queries located in two different query blocks.
>>> >> It looks quite rough but straightforward enough.  For each side of
>>> join
>>> >> operator, one of my users proposed a syntax like below:
>>> >>
>>> >>> /*+
>>> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000')
>>> */
>>> >>>
>>> >>> We haven't accepted this proposal now, maybe we could find some
>>> better
>>> >> design for this kind of case. Just for your information.
>>> >>
>>> >> I think if we want to utilize hints to support fine-grained
>>> configuration,
>>> >> we can open a new FLIP to discuss it.
>>> >> BTW, personally, I'm interested in how to design a graphical
>>> interface to
>>> >> help users to maintain their custom fine-grained configuration between
>>> >> their job versions.
>>> >>
>>> >> Best regards,
>>> >> Yisha
>>> >
>>>
>>>

Reply via email to