lincoln-lil commented on code in PR #21545: URL: https://github.com/apache/flink/pull/21545#discussion_r1101012560
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java: ########## @@ -61,89 +76,208 @@ public void before() throws Exception { + ") WITH (\n" + " 'connector' = 'values'\n" + ")"); + util.tableEnv() + .createTemporarySystemFunction( + "MockOffset", + new ClearLookupJoinHintWithInvalidPropagationShuttleTest() + .new MockOffsetTableFunction()); } @Test public void testNoNeedToClearLookupHint() { // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss', - // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10') ) */ * - // FROM src - // JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D - // ON T.a = D.a + // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10', + // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 'time-out'='300 s') + // */ s.a + // FROM src s + // JOIN lookup FOR SYSTEM_TIME AS OF s.pts AS d + // ON s.a=d.a + CorrelationId cid = builder.getCluster().createCorrel(); + RelDataType aType = + builder.getTypeFactory() + .createStructType( + Collections.singletonList( + builder.getTypeFactory().createSqlType(SqlTypeName.BIGINT)), + Collections.singletonList("a")); + RelDataType ptsType = + builder.getTypeFactory() + .createStructType( + Collections.singletonList( + builder.getTypeFactory() + .createProctimeIndicatorType(false)), + Collections.singletonList("pts")); RelNode root = builder.scan("src") .scan("lookup") .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME)) - .join( + .filter( + builder.equals( + builder.field( + builder.getRexBuilder().makeCorrel(aType, cid), + "a"), + builder.getRexBuilder().makeInputRef(aType, 0))) + .correlate( JoinRelType.INNER, - builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a"))) + cid, + builder.getRexBuilder().makeInputRef(aType, 0), + builder.getRexBuilder().makeInputRef(ptsType, 1)) .project(builder.field(1, 0, "a")) - .hints(LookupJoinHintTestUtil.getLookupJoinHint("lookup", false, true)) + .hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t1").build()) + .hints(LookupJoinHintTestUtil.getLookupJoinHint("d", true, false)) .build(); verifyRelPlan(root); } @Test - public void testClearLookupHintWithInvalidPropagationToViewWhileViewHasLookupHints() { - // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss', - // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10') ) */ * - // FROM ( - // SELECT /*+ LOOKUP('table'='lookup', 'async'='true', 'output-mode'='allow_unordered', - // 'capacity'='1000', 'time-out'='300 s' - // src.a, src.proctime - // FROM src - // JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D - // ON T.a = D.id - // ) t1 JOIN lookup FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.a = t2.a + public void testClearLookupHintWithInvalidPropagationToSubQuery() { + // SELECT /*+ LOOKUP('table'='src', 'retry-predicate'='lookup_miss', + // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10', + // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 'time-out'='300 s') + // */ t1.a + // FROM ( + // SELECT s.a + // FROM src s + // JOIN lookup FOR SYSTEM_TIME AS OF s.pts AS d + // ON s.a=d.a + // ) t1 + // JOIN src t2 + // ON t1.a=t2.a + + CorrelationId cid = builder.getCluster().createCorrel(); + RelDataType aType = + builder.getTypeFactory() + .createStructType( + Collections.singletonList( + builder.getTypeFactory().createSqlType(SqlTypeName.BIGINT)), + Collections.singletonList("a")); + RelDataType ptsType = + builder.getTypeFactory() + .createStructType( + Collections.singletonList( + builder.getTypeFactory() + .createProctimeIndicatorType(false)), + Collections.singletonList("pts")); RelNode root = builder.scan("src") .scan("lookup") .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME)) - .join( + .filter( + builder.equals( + builder.field( + builder.getRexBuilder().makeCorrel(aType, cid), + "a"), + builder.getRexBuilder().makeInputRef(aType, 0))) + .correlate( JoinRelType.INNER, - builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a"))) + cid, + builder.getRexBuilder().makeInputRef(aType, 0), + builder.getRexBuilder().makeInputRef(ptsType, 1)) .project(builder.field(1, 0, "a")) - .hints(LookupJoinHintTestUtil.getLookupJoinHint("lookup", false, true)) .hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t1").build()) + .hints(LookupJoinHintTestUtil.getLookupJoinHint("d", true, false)) .scan("src") - .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME)) + .hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t2").build()) .join( JoinRelType.INNER, builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a"))) .project(builder.field(1, 0, "a")) - .hints(LookupJoinHintTestUtil.getLookupJoinHint("lookup", true, false)) + .hints(LookupJoinHintTestUtil.getLookupJoinHint("src", true, true)) .build(); verifyRelPlan(root); } @Test - public void testClearLookupHintWithInvalidPropagationToSubQuery() { - // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss', - // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10', - // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 'time-out'='300 s' */* - // FROM ( - // SELECT src.a - // FROM src - // JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D - // ON T.a = D.id - // ) t1 JOIN src t2 ON t1.a = t2.a + public void testNoNeedToClearLookupHintWhileJoinWithUnnest() { + // SELECT /*+ LOOKUP('table'='d', 'retry-predicate'='lookup_miss', + // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10', + // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 'time-out'='300 s') + // */ s.a + // FROM src s + // CROSS JOIN UNNEST(s.ds) AS d(a) + + System.out.println( Review Comment: the debug info should be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org