Hi, devs,

Thanks for your valuable feedback. I've changed the title of the FLIP to
"Enhance COMPILED PLAN to support operator-level state TTL configuration"
and added an explanation for StateMetadata. If you have any concerns,
please let me know.

Best regards,
Jane

On Mon, Apr 3, 2023 at 11:42 PM Jane Chan <qingyue....@gmail.com> wrote:

> Hi Timo,
>
> Thanks for your valuable feedback. Let me explain the design details.
>
> > However, actually fine-grained state TTL should already be possible
> today. I don't fully understand where your proposed StateMetadata is
> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
> TwoInputStreamOperator?
>
> Currently, all ExecNodes that support JSON SerDe are annotated with
> @ExecNoteMetadata. This annotation interface has a key called
> consumedOptions, which persists all configuration that affects the
> topology. For ExecNodes that translate to OneInputStreamOperator, adding
> "table.exec.state.ttl" to consumedOptions is enough to achieve the goal of
> configuring TTL with fine granularity. However, this is not a generalized
> solution for ExecNodes that translate to TwoInputStreamOperator or
> MultipleInputStreamOperator. Because we may need to set different TTLs for
> the left / right (or k-th) input stream, but we do not want to introduce
> configurations like "table.exec.left-state.ttl" or
> "table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl".
>
> The proposed StateMetadata will be the member variable of ExecNodes that
> translates to stateful operators, similar to inputProperties (which is
> shared by all ExecNodes, though).
> I'd like to illustrate this in the following snippet of code for
> StreamExecJoin.
>
> @ExecNodeMetadata(
>       name = "stream-exec-join",
>       version = 1,
>       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
>       minPlanVersion = FlinkVersion.v1_15,
>       minStateVersion = FlinkVersion.v1_15)
>
> +@ExecNodeMetadata(
> +       name = "stream-exec-join",
> +       version = 2,
> +       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
> +       minPlanVersion = FlinkVersion.v1_18,
> +       minStateVersion = FlinkVersion.v1_15)
> public class StreamExecJoin extends ExecNodeBase<RowData>
>       implements StreamExecNode<RowData>, SingleTransformationTranslator<
> RowData> {
>
>
> +   @Nullable
> +   @JsonProperty(FIELD_NAME_STATE)
> +   private final List<StateMetadata> stateMetadataList;
>
>   public StreamExecJoin(
>           ReadableConfig tableConfig,
>           JoinSpec joinSpec,
>           List<int[]> leftUpsertKeys,
>           List<int[]> rightUpsertKeys,
>           InputProperty leftInputProperty,
>           InputProperty rightInputProperty,
>           RowType outputType,
>           String description) {
>       this(
>               ExecNodeContext.newNodeId(),
>               ExecNodeContext.newContext(StreamExecJoin.class),
>               ExecNodeContext.newPersistedConfig(StreamExecJoin.class,
> tableConfig),
>               joinSpec,
>               leftUpsertKeys,
>               rightUpsertKeys,
>               Lists.newArrayList(leftInputProperty, rightInputProperty),
>               outputType,
>               description,
> +              StateMetadata.multiInputDefaultMeta(tableConfig,
> LEFT_STATE_NAME, RIGHT_STATE_NAME));
>   }
>
>   @JsonCreator
>   public StreamExecJoin(
>           @JsonProperty(FIELD_NAME_ID) int id,
>           @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
>           @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
> persistedConfig,
>           @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
>           @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]>
> leftUpsertKeys,
>           @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]>
> rightUpsertKeys,
>           @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
> inputProperties,
>           @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
>           @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
>
> +          @Nullable
> +          @JsonProperty(FIELD_NAME_STATE) List<StateMetadata>
> stateMetadataList) {
>       super(id, context, persistedConfig, inputProperties, outputType,
> description);
>       checkArgument(inputProperties.size() == 2);
>       this.joinSpec = checkNotNull(joinSpec);
>       this.leftUpsertKeys = leftUpsertKeys;
>       this.rightUpsertKeys = rightUpsertKeys;
> +      this.stateMetadataList = stateMetadataList;
>   }
> @Override
>   @SuppressWarnings("unchecked")
>   protected Transformation<RowData> translateToPlanInternal(
>           PlannerBase planner, ExecNodeConfig config) {
>       final ExecEdge leftInputEdge = …;
>       final ExecEdge rightInputEdge = …;
>       . . .
>
>        // for backward compatibility
>       long leftStateRetentionTime =
>               isNullOrEmpty(stateMetadataList)
>                       ? config.getStateRetentionTime()
>                       : stateMetadataList.get(0).getStateTtl();
>       long rightStateRetentionTime =
>               isNullOrEmpty(stateMetadataList)
>                       ? leftStateRetentionTime
>                       : stateMetadataList.get(1).getStateTtl();
>
>       AbstractStreamingJoinOperator operator;
>       FlinkJoinType joinType = joinSpec.getJoinType();
>       if (joinType == FlinkJoinType.ANTI || joinType ==
> FlinkJoinType.SEMI) {
>           operator =
>                   new StreamingSemiAntiJoinOperator(
>                           joinType == FlinkJoinType.ANTI,
>                           leftTypeInfo,
>                           rightTypeInfo,
>                           generatedCondition,
>                           leftInputSpec,
>                           rightInputSpec,
>                           joinSpec.getFilterNulls(),
>                           leftStateRetentionTime,
> +                          rightStateRetentionTime);
>       } else {
>
>            operator =
>                   new StreamingJoinOperator(
>                           leftTypeInfo,
>                           rightTypeInfo,
>                           generatedCondition,
>                           leftInputSpec,
>                           rightInputSpec,
>                           leftIsOuter,
>                           rightIsOuter,
>                           joinSpec.getFilterNulls(),
>                           leftStateRetentionTime,
> +                          rightStateRetentionTime);
>       }
>
>       . . .
>       return transform;
>   }
> }
>
>
>
>
> Best regards,
> Jane
>
> On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Jane,
>>
>> thanks for proposing this FLIP. More state insights and fine-grained
>> state TTL are a frequently requested feature for Flink SQL. Eventually,
>> we need to address this.
>>
>> I agree with the previous responses that doing this with a hint might
>> cause more confusion than it actually helps. We should use hints only if
>> they can be placed close to an operation (e.g. JOIN or table). And only
>> where a global flag for the entire query is not sufficient using SET.
>>
>> In general, I support the current direction of the FLIP and continuing
>> the vision of FLIP-190. However, actually fine-grained state TTL should
>> already be possible today. Maybe this is untested yet, but we largely
>> reworked how configuration works within the planner in Flink 1.15.
>>
>> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already combines
>> configuration coming from TableConfig with per-ExecNode config.
>> Actually, state TTL from JSON plan should already have higher precedence
>> than TableConfig.
>>
>> It would be great to extend the meta-information of ExecNodes with state
>> insights. I don't fully understand where your proposed StateMetadata is
>> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
>> TwoInputStreamOperator?
>>
>> I think it should be a combination of ExecNodeMetadata with rough
>> estimates (declaration) or StreamExecNode. But should not bubble into
>> TwoInputStreamOperator.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
>>
>> On 03.04.23 09:15, godfrey he wrote:
>> > Hi Jane,
>> >
>> > Thanks for driving this FLIP.
>> >
>> > I think the compiled plan solution and the hint solution do not
>> > conflict, the two can exist at the same time.
>> > The compiled plan solution can address the need of advanced users and
>> > the platform users
>> > which all stateful operators' state TTL can be defined by user. While
>> > the hint solution can address some
>> >   specific simple scenarios, which is very user-friendly, convenient,
>> > and unambiguous to use.
>> >
>> > Some stateful operators are not compiled from SQL directly, such as
>> > ChangelogNormalize and
>> > SinkUpsertMaterializer mentioned above,  I notice the the example given
>> by Yisha
>> > has hints propagation problem which does not conform to the current
>> design.
>> > The rough idea about the hint solution should be simple (only the
>> > common operators are supported)
>> > and easy to understand (no hints propagation).
>> >
>> > If the hint solution is supported, a compiled plan which is from a
>> > query with state TTL hints
>> >   can also be further modified for the state TTL parts.
>> >
>> > So, I prefer the hint solution to be discuss in a separate FLIP.  I
>> > think that FLIP maybe
>> > need a lot discussion.
>> >
>> > Best,
>> > Godfrey
>> >
>> > 周伊莎 <zhouyi...@bytedance.com.invalid> 于2023年3月30日周四 22:04写道:
>> >>
>> >> Hi Jane,
>> >>
>> >> Thanks for your detailed response.
>> >>
>> >> You mentioned that there are 10k+ SQL jobs in your production
>> >>> environment, but only ~100 jobs' migration involves plan editing. Is
>> 10k+
>> >>> the number of total jobs, or the number of jobs that use stateful
>> >>> computation and need state migration?
>> >>>
>> >>
>> >> 10k is the number of SQL jobs that enable periodic checkpoint. And
>> >> surely if users change their sql which result in changes of the plan,
>> they
>> >> need to do state migration.
>> >>
>> >> - You mentioned that "A truth that can not be ignored is that users
>> >>> usually tend to give up editing TTL(or operator ID in our case)
>> instead of
>> >>> migrating this configuration between their versions of one given
>> job." So
>> >>> what would users prefer to do if they're reluctant to edit the
>> operator
>> >>> ID? Would they submit the same SQL as a new job with a higher version
>> to
>> >>> re-accumulating the state from the earliest offset?
>> >>
>> >>
>> >> You're exactly right. People will tend to re-accumulate the state from
>> a
>> >> given offset by changing the namespace of their checkpoint.
>> >> Namespace is an internal concept and restarting the sql job in a new
>> >> namespace can be simply understood as submitting a new job.
>> >>
>> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the
>> >>> following syntax to perform plan migration
>> >>
>> >>
>> >> The 'plan migration'  I said in my last reply may be inaccurate.  It's
>> more
>> >> like 'query evolution'. In other word, if a user submitted a sql job
>> with a
>> >> configured compiled plan, and then
>> >> he changes the sql,  the compiled plan changes too, how to move the
>> >> configuration in the old plan to the new plan.
>> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and
>> leave out
>> >> the 'query evolution' which is a fundamental change to the query. E.g.
>> >> adding a filter condition, a different aggregation.
>> >> And I'm really looking forward to a solution for query evolution.
>> >>
>> >> And I'm also curious about how to use the hint
>> >>> approach to cover cases like
>> >>>
>> >>> - configuring TTL for operators like ChangelogNormalize,
>> >>> SinkUpsertMaterializer, etc., these operators are derived by the
>> planner
>> >>> implicitly
>> >>> - cope with two/multiple input stream operator's state TTL, like join,
>> >>> and other operations like row_number, rank, correlate, etc.
>> >>
>> >>
>> >>   Actually, in our company , we make operators in the query block
>> where the
>> >> hint locates all affected by that hint. For example,
>> >>
>> >> INSERT INTO sink
>> >>> SELECT /*+ STATE_TTL('1D') */
>> >>>     id,
>> >>>     name,
>> >>>     num
>> >>> FROM (
>> >>>     SELECT
>> >>>         *,
>> >>>         ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as
>> row_num
>> >>>     FROM (
>> >>>         SELECT
>> >>>             *
>> >>>         FROM (
>> >>>             SELECT
>> >>>                 id,
>> >>>                 name,
>> >>>                 max(num) as num
>> >>>             FROM source1
>> >>>             GROUP BY
>> >>>                 id, name, TUMBLE(proc, INTERVAL '1' MINUTE)
>> >>>         )
>> >>>         GROUP BY
>> >>>             id, name, num
>> >>>     )
>> >>> )
>> >>> WHERE row_num = 1
>> >>>
>> >>
>> >> In the SQL above, the state TTL of Rank and Agg will be all configured
>> as 1
>> >> day.  If users want to set different TTL for Rank and Agg, they can
>> just
>> >> make these two queries located in two different query blocks.
>> >> It looks quite rough but straightforward enough.  For each side of join
>> >> operator, one of my users proposed a syntax like below:
>> >>
>> >>> /*+
>> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000')
>> */
>> >>>
>> >>> We haven't accepted this proposal now, maybe we could find some better
>> >> design for this kind of case. Just for your information.
>> >>
>> >> I think if we want to utilize hints to support fine-grained
>> configuration,
>> >> we can open a new FLIP to discuss it.
>> >> BTW, personally, I'm interested in how to design a graphical interface
>> to
>> >> help users to maintain their custom fine-grained configuration between
>> >> their job versions.
>> >>
>> >> Best regards,
>> >> Yisha
>> >
>>
>>

Reply via email to