JingsongLi commented on code in PR #596:
URL: https://github.com/apache/flink-table-store/pull/596#discussion_r1133866492


##########
flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java:
##########
@@ -130,57 +131,78 @@ public void testVariousChangelogProducer(
     }
 
     @Test
-    public void testUsingSource() throws Exception {
+    public void testTargetAlias() throws Exception {
         // prepare table T
         prepareTable(CoreOptions.ChangelogProducer.NONE);
 
-        // similar to:
-        // MERGE INTO T
-        // USING (SELECT * FROM S WHERE k < 12) AS SS
-        // ON T.k = SS.k AND T.dt = SS.dt
-        // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE
-        //   SET v = SS.v, last_action = 'matched_upsert'
-        // WHEN MATCHED AND SS.v IS NULL THEN DELETE
-        // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt)
-        // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
-        //   SET v = v || '_nmu', last_action = 'not_matched_upsert'
-        // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
-        action.withSource("SELECT * FROM S WHERE k < 12", "SS")
-                .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
-                .withMatchedUpsert(
-                        "T.v <> SS.v AND SS.v IS NOT NULL",
-                        "v = SS.v, last_action = 'matched_upsert'")
-                .withMatchedDelete("SS.v IS NULL")
-                .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt")
-                .withNotMatchedBySourceUpsert(
-                        "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
-                .withNotMatchedBySourceDelete("dt >= '02-28'");
+        action.withTargetAlias("TT")
+                .withSourceTable(null, "S")
+                .withMergeCondition("TT.k = S.k AND TT.dt = S.dt")
+                .withMatchedDelete("S.v IS NULL");
 
         validateActionRunResult(
                 action,
                 Arrays.asList(
-                        changelogRow("-U", 7, "v_7", "creation", "02-28"),
-                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
-                        changelogRow("-D", 8, "v_8", "creation", "02-28"),
-                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
-                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
-                        changelogRow("-U", 2, "v_2", "creation", "02-27"),
-                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("-U", 3, "v_3", "creation", "02-27"),
-                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("-D", 5, "v_5", "creation", "02-28"),
-                        changelogRow("-D", 6, "v_6", "creation", "02-28"),
-                        changelogRow("-D", 9, "v_9", "creation", "02-28"),
-                        changelogRow("-D", 10, "v_10", "creation", "02-28")),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28")),
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
-                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
-                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
-                        changelogRow("+I", 11, "v_11", "insert", "02-29")));
+                        changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+I", 7, "v_7", "creation", "02-28"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+    }
+
+    @Test
+    public void testUsingDdlSource() throws Exception {
+        // prepare table T
+        prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+        TestValuesTableFactory.registerData(
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "02-27"),
+                        changelogRow("+I", 4, null, "02-27"),
+                        changelogRow("+I", 8, null, "02-28")));
+
+        String catalog =
+                String.format(
+                        "CREATE CATALOG test_cat WITH ('type' = 'table-store', 
'warehouse' = '%s')",
+                        getTempDirPath());
+        String useCatalog = "USE CATALOG test_cat";
+        String id =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(
+                                changelogRow("+I", 1, "v_1", "02-27"),
+                                changelogRow("+I", 4, null, "02-27"),
+                                changelogRow("+I", 8, null, "02-28")));
+        String ddl =
+                String.format(
+                        "CREATE TEMPORARY TABLE S (k INT, v STRING, dt 
STRING)\n"
+                                + "WITH ('connector' = 'values', 'bounded' = 
'true', 'data-id' = '%s');",
+                        id);
+
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        action.withSqlSource("test_cat.default.S", catalog, useCatalog, ddl)

Review Comment:
   `test_cat.default.S` -> `S`.
   Add comment: test current catalog and current database.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to