lincoln-lil commented on code in PR #21545:
URL: https://github.com/apache/flink/pull/21545#discussion_r1101012560


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java:
##########
@@ -61,89 +76,208 @@ public void before() throws Exception {
                                 + ") WITH (\n"
                                 + " 'connector' = 'values'\n"
                                 + ")");
+        util.tableEnv()
+                .createTemporarySystemFunction(
+                        "MockOffset",
+                        new 
ClearLookupJoinHintWithInvalidPropagationShuttleTest()
+                        .new MockOffsetTableFunction());
     }
 
     @Test
     public void testNoNeedToClearLookupHint() {
         // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss',
-        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 
'max-attempts'='10') ) */ *
-        //  FROM src
-        //  JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D
-        //      ON T.a = D.a
+        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 
'max-attempts'='10',
+        // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 
'time-out'='300 s')
+        // */ s.a
+        // FROM src s
+        // JOIN lookup FOR SYSTEM_TIME AS OF s.pts AS d
+        // ON s.a=d.a
+        CorrelationId cid = builder.getCluster().createCorrel();
+        RelDataType aType =
+                builder.getTypeFactory()
+                        .createStructType(
+                                Collections.singletonList(
+                                        
builder.getTypeFactory().createSqlType(SqlTypeName.BIGINT)),
+                                Collections.singletonList("a"));
+        RelDataType ptsType =
+                builder.getTypeFactory()
+                        .createStructType(
+                                Collections.singletonList(
+                                        builder.getTypeFactory()
+                                                
.createProctimeIndicatorType(false)),
+                                Collections.singletonList("pts"));
         RelNode root =
                 builder.scan("src")
                         .scan("lookup")
                         
.snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
-                        .join(
+                        .filter(
+                                builder.equals(
+                                        builder.field(
+                                                
builder.getRexBuilder().makeCorrel(aType, cid),
+                                                "a"),
+                                        
builder.getRexBuilder().makeInputRef(aType, 0)))
+                        .correlate(
                                 JoinRelType.INNER,
-                                builder.equals(builder.field(2, 0, "a"), 
builder.field(2, 1, "a")))
+                                cid,
+                                builder.getRexBuilder().makeInputRef(aType, 0),
+                                builder.getRexBuilder().makeInputRef(ptsType, 
1))
                         .project(builder.field(1, 0, "a"))
-                        
.hints(LookupJoinHintTestUtil.getLookupJoinHint("lookup", false, true))
+                        
.hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t1").build())
+                        .hints(LookupJoinHintTestUtil.getLookupJoinHint("d", 
true, false))
                         .build();
         verifyRelPlan(root);
     }
 
     @Test
-    public void 
testClearLookupHintWithInvalidPropagationToViewWhileViewHasLookupHints() {
-        // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss',
-        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 
'max-attempts'='10') ) */ *
-        //   FROM (
-        //     SELECT /*+ LOOKUP('table'='lookup', 'async'='true', 
'output-mode'='allow_unordered',
-        // 'capacity'='1000', 'time-out'='300 s'
-        //       src.a, src.proctime
-        //     FROM src
-        //       JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D
-        //         ON T.a = D.id
-        //     ) t1 JOIN lookup FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON 
t1.a = t2.a
+    public void testClearLookupHintWithInvalidPropagationToSubQuery() {
+        // SELECT /*+ LOOKUP('table'='src', 'retry-predicate'='lookup_miss',
+        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 
'max-attempts'='10',
+        // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 
'time-out'='300 s')
+        // */ t1.a
+        //  FROM (
+        //      SELECT s.a
+        //      FROM src s
+        //      JOIN lookup FOR SYSTEM_TIME AS OF s.pts AS d
+        //      ON s.a=d.a
+        //  ) t1
+        //  JOIN src t2
+        //  ON t1.a=t2.a
+
+        CorrelationId cid = builder.getCluster().createCorrel();
+        RelDataType aType =
+                builder.getTypeFactory()
+                        .createStructType(
+                                Collections.singletonList(
+                                        
builder.getTypeFactory().createSqlType(SqlTypeName.BIGINT)),
+                                Collections.singletonList("a"));
+        RelDataType ptsType =
+                builder.getTypeFactory()
+                        .createStructType(
+                                Collections.singletonList(
+                                        builder.getTypeFactory()
+                                                
.createProctimeIndicatorType(false)),
+                                Collections.singletonList("pts"));
         RelNode root =
                 builder.scan("src")
                         .scan("lookup")
                         
.snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
-                        .join(
+                        .filter(
+                                builder.equals(
+                                        builder.field(
+                                                
builder.getRexBuilder().makeCorrel(aType, cid),
+                                                "a"),
+                                        
builder.getRexBuilder().makeInputRef(aType, 0)))
+                        .correlate(
                                 JoinRelType.INNER,
-                                builder.equals(builder.field(2, 0, "a"), 
builder.field(2, 1, "a")))
+                                cid,
+                                builder.getRexBuilder().makeInputRef(aType, 0),
+                                builder.getRexBuilder().makeInputRef(ptsType, 
1))
                         .project(builder.field(1, 0, "a"))
-                        
.hints(LookupJoinHintTestUtil.getLookupJoinHint("lookup", false, true))
                         
.hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t1").build())
+                        .hints(LookupJoinHintTestUtil.getLookupJoinHint("d", 
true, false))
                         .scan("src")
-                        
.snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
+                        
.hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t2").build())
                         .join(
                                 JoinRelType.INNER,
                                 builder.equals(builder.field(2, 0, "a"), 
builder.field(2, 1, "a")))
                         .project(builder.field(1, 0, "a"))
-                        
.hints(LookupJoinHintTestUtil.getLookupJoinHint("lookup", true, false))
+                        .hints(LookupJoinHintTestUtil.getLookupJoinHint("src", 
true, true))
                         .build();
         verifyRelPlan(root);
     }
 
     @Test
-    public void testClearLookupHintWithInvalidPropagationToSubQuery() {
-        // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss',
-        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 
'max-attempts'='10',
-        // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 
'time-out'='300 s' */*
-        //   FROM (
-        //     SELECT src.a
-        //     FROM src
-        //     JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D
-        //       ON T.a = D.id
-        //   ) t1 JOIN src t2 ON t1.a = t2.a
+    public void testNoNeedToClearLookupHintWhileJoinWithUnnest() {
+        //  SELECT /*+ LOOKUP('table'='d', 'retry-predicate'='lookup_miss',
+        //  'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 
'max-attempts'='10',
+        //  'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 
'time-out'='300 s')
+        //  */ s.a
+        //  FROM src s
+        //  CROSS JOIN UNNEST(s.ds) AS d(a)
+
+        System.out.println(

Review Comment:
   the debug info should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to