JingsongLi commented on code in PR #596: URL: https://github.com/apache/flink-table-store/pull/596#discussion_r1133866492
########## flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java: ########## @@ -130,57 +131,78 @@ public void testVariousChangelogProducer( } @Test - public void testUsingSource() throws Exception { + public void testTargetAlias() throws Exception { // prepare table T prepareTable(CoreOptions.ChangelogProducer.NONE); - // similar to: - // MERGE INTO T - // USING (SELECT * FROM S WHERE k < 12) AS SS - // ON T.k = SS.k AND T.dt = SS.dt - // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE - // SET v = SS.v, last_action = 'matched_upsert' - // WHEN MATCHED AND SS.v IS NULL THEN DELETE - // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt) - // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE - // SET v = v || '_nmu', last_action = 'not_matched_upsert' - // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); - action.withSource("SELECT * FROM S WHERE k < 12", "SS") - .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") - .withMatchedUpsert( - "T.v <> SS.v AND SS.v IS NOT NULL", - "v = SS.v, last_action = 'matched_upsert'") - .withMatchedDelete("SS.v IS NULL") - .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt") - .withNotMatchedBySourceUpsert( - "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'") - .withNotMatchedBySourceDelete("dt >= '02-28'"); + action.withTargetAlias("TT") + .withSourceTable(null, "S") + .withMergeCondition("TT.k = S.k AND TT.dt = S.dt") + .withMatchedDelete("S.v IS NULL"); validateActionRunResult( action, Arrays.asList( - changelogRow("-U", 7, "v_7", "creation", "02-28"), - changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), changelogRow("-D", 4, "v_4", "creation", "02-27"), - changelogRow("-D", 8, "v_8", "creation", "02-28"), - changelogRow("+I", 8, "v_8", "insert", "02-29"), - changelogRow("+I", 11, "v_11", "insert", "02-29"), - changelogRow("-U", 2, "v_2", "creation", "02-27"), - changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), - changelogRow("-U", 3, "v_3", "creation", "02-27"), - changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), - changelogRow("-D", 5, "v_5", "creation", "02-28"), - changelogRow("-D", 6, "v_6", "creation", "02-28"), - changelogRow("-D", 9, "v_9", "creation", "02-28"), - changelogRow("-D", 10, "v_10", "creation", "02-28")), + changelogRow("-D", 8, "v_8", "creation", "02-28")), Arrays.asList( changelogRow("+I", 1, "v_1", "creation", "02-27"), - changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), - changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), - changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), - changelogRow("+I", 8, "v_8", "insert", "02-29"), - changelogRow("+I", 11, "v_11", "insert", "02-29"))); + changelogRow("+I", 2, "v_2", "creation", "02-27"), + changelogRow("+I", 3, "v_3", "creation", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+I", 7, "v_7", "creation", "02-28"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28"))); + } + + @Test + public void testUsingDdlSource() throws Exception { + // prepare table T + prepareTable(CoreOptions.ChangelogProducer.NONE); + + TestValuesTableFactory.registerData( + Arrays.asList( + changelogRow("+I", 1, "v_1", "02-27"), + changelogRow("+I", 4, null, "02-27"), + changelogRow("+I", 8, null, "02-28"))); + + String catalog = + String.format( + "CREATE CATALOG test_cat WITH ('type' = 'table-store', 'warehouse' = '%s')", + getTempDirPath()); + String useCatalog = "USE CATALOG test_cat"; + String id = + TestValuesTableFactory.registerData( + Arrays.asList( + changelogRow("+I", 1, "v_1", "02-27"), + changelogRow("+I", 4, null, "02-27"), + changelogRow("+I", 8, null, "02-28"))); + String ddl = + String.format( + "CREATE TEMPORARY TABLE S (k INT, v STRING, dt STRING)\n" + + "WITH ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s');", + id); + + MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); + action.withSqlSource("test_cat.default.S", catalog, useCatalog, ddl) Review Comment: `test_cat.default.S` -> `S`. Add comment: test current catalog and current database. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org