Hi Timo,

Thanks for your valuable feedback. Let me explain the design details.

> However, actually fine-grained state TTL should already be possible
today. I don't fully understand where your proposed StateMetadata is
located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
TwoInputStreamOperator?

Currently, all ExecNodes that support JSON SerDe are annotated with
@ExecNoteMetadata. This annotation interface has a key called
consumedOptions, which persists all configuration that affects the
topology. For ExecNodes that translate to OneInputStreamOperator, adding
"table.exec.state.ttl" to consumedOptions is enough to achieve the goal of
configuring TTL with fine granularity. However, this is not a generalized
solution for ExecNodes that translate to TwoInputStreamOperator or
MultipleInputStreamOperator. Because we may need to set different TTLs for
the left / right (or k-th) input stream, but we do not want to introduce
configurations like "table.exec.left-state.ttl" or
"table.exec.right-state.ttl" or "table.exec.kth-input-state.ttl".

The proposed StateMetadata will be the member variable of ExecNodes that
translates to stateful operators, similar to inputProperties (which is
shared by all ExecNodes, though).
I'd like to illustrate this in the following snippet of code for
StreamExecJoin.

@ExecNodeMetadata(
      name = "stream-exec-join",
      version = 1,
      producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
      minPlanVersion = FlinkVersion.v1_15,
      minStateVersion = FlinkVersion.v1_15)

+@ExecNodeMetadata(
+       name = "stream-exec-join",
+       version = 2,
+       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
+       minPlanVersion = FlinkVersion.v1_18,
+       minStateVersion = FlinkVersion.v1_15)
public class StreamExecJoin extends ExecNodeBase<RowData>
      implements StreamExecNode<RowData>, SingleTransformationTranslator<
RowData> {


+   @Nullable
+   @JsonProperty(FIELD_NAME_STATE)
+   private final List<StateMetadata> stateMetadataList;

  public StreamExecJoin(
          ReadableConfig tableConfig,
          JoinSpec joinSpec,
          List<int[]> leftUpsertKeys,
          List<int[]> rightUpsertKeys,
          InputProperty leftInputProperty,
          InputProperty rightInputProperty,
          RowType outputType,
          String description) {
      this(
              ExecNodeContext.newNodeId(),
              ExecNodeContext.newContext(StreamExecJoin.class),
              ExecNodeContext.newPersistedConfig(StreamExecJoin.class,
tableConfig),
              joinSpec,
              leftUpsertKeys,
              rightUpsertKeys,
              Lists.newArrayList(leftInputProperty, rightInputProperty),
              outputType,
              description,
+              StateMetadata.multiInputDefaultMeta(tableConfig,
LEFT_STATE_NAME, RIGHT_STATE_NAME));
  }

  @JsonCreator
  public StreamExecJoin(
          @JsonProperty(FIELD_NAME_ID) int id,
          @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
          @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
          @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
          @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]>
leftUpsertKeys,
          @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]>
rightUpsertKeys,
          @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
          @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
          @JsonProperty(FIELD_NAME_DESCRIPTION) String description,

+          @Nullable
+          @JsonProperty(FIELD_NAME_STATE) List<StateMetadata>
stateMetadataList) {
      super(id, context, persistedConfig, inputProperties, outputType,
description);
      checkArgument(inputProperties.size() == 2);
      this.joinSpec = checkNotNull(joinSpec);
      this.leftUpsertKeys = leftUpsertKeys;
      this.rightUpsertKeys = rightUpsertKeys;
+      this.stateMetadataList = stateMetadataList;
  }
@Override
  @SuppressWarnings("unchecked")
  protected Transformation<RowData> translateToPlanInternal(
          PlannerBase planner, ExecNodeConfig config) {
      final ExecEdge leftInputEdge = …;
      final ExecEdge rightInputEdge = …;
      . . .

       // for backward compatibility
      long leftStateRetentionTime =
              isNullOrEmpty(stateMetadataList)
                      ? config.getStateRetentionTime()
                      : stateMetadataList.get(0).getStateTtl();
      long rightStateRetentionTime =
              isNullOrEmpty(stateMetadataList)
                      ? leftStateRetentionTime
                      : stateMetadataList.get(1).getStateTtl();

      AbstractStreamingJoinOperator operator;
      FlinkJoinType joinType = joinSpec.getJoinType();
      if (joinType == FlinkJoinType.ANTI || joinType == FlinkJoinType.SEMI)
{
          operator =
                  new StreamingSemiAntiJoinOperator(
                          joinType == FlinkJoinType.ANTI,
                          leftTypeInfo,
                          rightTypeInfo,
                          generatedCondition,
                          leftInputSpec,
                          rightInputSpec,
                          joinSpec.getFilterNulls(),
                          leftStateRetentionTime,
+                          rightStateRetentionTime);
      } else {

           operator =
                  new StreamingJoinOperator(
                          leftTypeInfo,
                          rightTypeInfo,
                          generatedCondition,
                          leftInputSpec,
                          rightInputSpec,
                          leftIsOuter,
                          rightIsOuter,
                          joinSpec.getFilterNulls(),
                          leftStateRetentionTime,
+                          rightStateRetentionTime);
      }

      . . .
      return transform;
  }
}




Best regards,
Jane

On Mon, Apr 3, 2023 at 10:01 PM Timo Walther <twal...@apache.org> wrote:

> Hi Jane,
>
> thanks for proposing this FLIP. More state insights and fine-grained
> state TTL are a frequently requested feature for Flink SQL. Eventually,
> we need to address this.
>
> I agree with the previous responses that doing this with a hint might
> cause more confusion than it actually helps. We should use hints only if
> they can be placed close to an operation (e.g. JOIN or table). And only
> where a global flag for the entire query is not sufficient using SET.
>
> In general, I support the current direction of the FLIP and continuing
> the vision of FLIP-190. However, actually fine-grained state TTL should
> already be possible today. Maybe this is untested yet, but we largely
> reworked how configuration works within the planner in Flink 1.15.
>
> As you quickly mentioned in the FLIP, ExecNodeConfig[1] already combines
> configuration coming from TableConfig with per-ExecNode config.
> Actually, state TTL from JSON plan should already have higher precedence
> than TableConfig.
>
> It would be great to extend the meta-information of ExecNodes with state
> insights. I don't fully understand where your proposed StateMetadata is
> located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or
> TwoInputStreamOperator?
>
> I think it should be a combination of ExecNodeMetadata with rough
> estimates (declaration) or StreamExecNode. But should not bubble into
> TwoInputStreamOperator.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
>
> On 03.04.23 09:15, godfrey he wrote:
> > Hi Jane,
> >
> > Thanks for driving this FLIP.
> >
> > I think the compiled plan solution and the hint solution do not
> > conflict, the two can exist at the same time.
> > The compiled plan solution can address the need of advanced users and
> > the platform users
> > which all stateful operators' state TTL can be defined by user. While
> > the hint solution can address some
> >   specific simple scenarios, which is very user-friendly, convenient,
> > and unambiguous to use.
> >
> > Some stateful operators are not compiled from SQL directly, such as
> > ChangelogNormalize and
> > SinkUpsertMaterializer mentioned above,  I notice the the example given
> by Yisha
> > has hints propagation problem which does not conform to the current
> design.
> > The rough idea about the hint solution should be simple (only the
> > common operators are supported)
> > and easy to understand (no hints propagation).
> >
> > If the hint solution is supported, a compiled plan which is from a
> > query with state TTL hints
> >   can also be further modified for the state TTL parts.
> >
> > So, I prefer the hint solution to be discuss in a separate FLIP.  I
> > think that FLIP maybe
> > need a lot discussion.
> >
> > Best,
> > Godfrey
> >
> > 周伊莎 <zhouyi...@bytedance.com.invalid> 于2023年3月30日周四 22:04写道:
> >>
> >> Hi Jane,
> >>
> >> Thanks for your detailed response.
> >>
> >> You mentioned that there are 10k+ SQL jobs in your production
> >>> environment, but only ~100 jobs' migration involves plan editing. Is
> 10k+
> >>> the number of total jobs, or the number of jobs that use stateful
> >>> computation and need state migration?
> >>>
> >>
> >> 10k is the number of SQL jobs that enable periodic checkpoint. And
> >> surely if users change their sql which result in changes of the plan,
> they
> >> need to do state migration.
> >>
> >> - You mentioned that "A truth that can not be ignored is that users
> >>> usually tend to give up editing TTL(or operator ID in our case)
> instead of
> >>> migrating this configuration between their versions of one given job."
> So
> >>> what would users prefer to do if they're reluctant to edit the operator
> >>> ID? Would they submit the same SQL as a new job with a higher version
> to
> >>> re-accumulating the state from the earliest offset?
> >>
> >>
> >> You're exactly right. People will tend to re-accumulate the state from a
> >> given offset by changing the namespace of their checkpoint.
> >> Namespace is an internal concept and restarting the sql job in a new
> >> namespace can be simply understood as submitting a new job.
> >>
> >> Back to your suggestions, I noticed that FLIP-190 [3] proposed the
> >>> following syntax to perform plan migration
> >>
> >>
> >> The 'plan migration'  I said in my last reply may be inaccurate.  It's
> more
> >> like 'query evolution'. In other word, if a user submitted a sql job
> with a
> >> configured compiled plan, and then
> >> he changes the sql,  the compiled plan changes too, how to move the
> >> configuration in the old plan to the new plan.
> >> IIUC, FLIP-190 aims to solve issues in flink version upgrades and leave
> out
> >> the 'query evolution' which is a fundamental change to the query. E.g.
> >> adding a filter condition, a different aggregation.
> >> And I'm really looking forward to a solution for query evolution.
> >>
> >> And I'm also curious about how to use the hint
> >>> approach to cover cases like
> >>>
> >>> - configuring TTL for operators like ChangelogNormalize,
> >>> SinkUpsertMaterializer, etc., these operators are derived by the
> planner
> >>> implicitly
> >>> - cope with two/multiple input stream operator's state TTL, like join,
> >>> and other operations like row_number, rank, correlate, etc.
> >>
> >>
> >>   Actually, in our company , we make operators in the query block where
> the
> >> hint locates all affected by that hint. For example,
> >>
> >> INSERT INTO sink
> >>> SELECT /*+ STATE_TTL('1D') */
> >>>     id,
> >>>     name,
> >>>     num
> >>> FROM (
> >>>     SELECT
> >>>         *,
> >>>         ROW_NUMBER() OVER (PARTITION BY id ORDER BY num DESC) as
> row_num
> >>>     FROM (
> >>>         SELECT
> >>>             *
> >>>         FROM (
> >>>             SELECT
> >>>                 id,
> >>>                 name,
> >>>                 max(num) as num
> >>>             FROM source1
> >>>             GROUP BY
> >>>                 id, name, TUMBLE(proc, INTERVAL '1' MINUTE)
> >>>         )
> >>>         GROUP BY
> >>>             id, name, num
> >>>     )
> >>> )
> >>> WHERE row_num = 1
> >>>
> >>
> >> In the SQL above, the state TTL of Rank and Agg will be all configured
> as 1
> >> day.  If users want to set different TTL for Rank and Agg, they can just
> >> make these two queries located in two different query blocks.
> >> It looks quite rough but straightforward enough.  For each side of join
> >> operator, one of my users proposed a syntax like below:
> >>
> >>> /*+
> JOIN_TTL('tables'='left_talbe,right_table','left_ttl'='100000','right_ttl'='10000')
> */
> >>>
> >>> We haven't accepted this proposal now, maybe we could find some better
> >> design for this kind of case. Just for your information.
> >>
> >> I think if we want to utilize hints to support fine-grained
> configuration,
> >> we can open a new FLIP to discuss it.
> >> BTW, personally, I'm interested in how to design a graphical interface
> to
> >> help users to maintain their custom fine-grained configuration between
> >> their job versions.
> >>
> >> Best regards,
> >> Yisha
> >
>
>

Reply via email to