hackergin commented on code in PR #24844:
URL: https://github.com/apache/flink/pull/24844#discussion_r1616517183


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient 
RestClusterClient<?> restClu
                                         .asSerializableString()));
     }
 
-    private SessionHandle initializeSession() {
-        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
-        String catalogDDL =
-                String.format(
-                        "CREATE CATALOG %s\n"
-                                + "WITH (\n"
-                                + "  'type' = 'test-filesystem',\n"
-                                + "  'path' = '%s',\n"
-                                + "  'default-database' = '%s'\n"
-                                + "  )",
-                        fileSystemCatalogName, fileSystemCatalogPath, 
TEST_DEFAULT_DATABASE);
-        service.configureSession(sessionHandle, catalogDDL, -1);
-        service.configureSession(
-                sessionHandle, String.format("USE CATALOG %s", 
fileSystemCatalogName), -1);
-
-        // create source table
-        String dataGenSource =
-                "CREATE TABLE datagenSource (\n"
-                        + "  order_id BIGINT,\n"
-                        + "  order_number VARCHAR(20),\n"
-                        + "  user_id BIGINT,\n"
-                        + "  shop_id BIGINT,\n"
-                        + "  product_id BIGINT,\n"
-                        + "  status BIGINT,\n"
-                        + "  order_type BIGINT,\n"
-                        + "  order_created_at TIMESTAMP,\n"
-                        + "  payment_amount_cents BIGINT\n"
-                        + ")\n"
-                        + "WITH (\n"
-                        + "  'connector' = 'datagen',\n"
-                        + "  'rows-per-second' = '10'\n"
-                        + ")";
-        service.configureSession(sessionHandle, dataGenSource, -1);
-        return sessionHandle;
+    @Test
+    void testRefreshMaterializedTable() throws Exception {
+        long timeout = Duration.ofSeconds(20).toMillis();
+        long pause = Duration.ofSeconds(2).toMillis();
+
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+        data.add(Row.of(2L, 2L, 2L, "2024-01-01"));
+        data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+        data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
+        data.add(Row.of(5L, 5L, 5L, "2024-01-03"));
+        data.add(Row.of(6L, 6L, 6L, "2024-01-03"));
+        String dataId = TestValuesTableFactory.registerData(data);
+
+        
createAndVerifyCreateMaterializedTableWithData("my_materialized_table", dataId, 
data);
+
+        // remove element of partition '2024-01-02'
+        removePartitionValue(data, "2024-01-02");
+
+        // refresh the materialized table with static partition
+        long startTime = System.currentTimeMillis();
+        Map<String, String> staticPartitions = new HashMap<>();
+        staticPartitions.put("ds", "2024-01-02");
+        ObjectIdentifier objectIdentifier =
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table");
+        OperationHandle refreshTableHandle =
+                service.refreshMaterializedTable(
+                        sessionHandle,
+                        objectIdentifier.asSerializableString(),
+                        false,
+                        null,
+                        Collections.emptyMap(),
+                        staticPartitions,
+                        Collections.emptyMap());
+
+        awaitOperationTermination(service, sessionHandle, refreshTableHandle);
+        List<RowData> result = fetchAllResults(service, sessionHandle, 
refreshTableHandle);
+        assertThat(result.size()).isEqualTo(1);
+        String jobId = result.get(0).getString(0).toString();
+
+        // 1. verify fresh job created
+        verifyRefreshJobCreated(restClusterClient, jobId, startTime);
+
+        // 2. verify the new job overwrite the data
+        CommonTestUtils.waitUtil(
+                () ->
+                        fetchTableData(sessionHandle, "SELECT * FROM 
my_materialized_table").size()
+                                == data.size(),
+                Duration.ofMillis(timeout),
+                Duration.ofMillis(pause),
+                "Failed to verify the data in materialized table.");
+        assertThat(
+                        fetchTableData(
+                                        sessionHandle,
+                                        "SELECT * FROM my_materialized_table 
where ds = '2024-01-02'")
+                                .size())
+                .isEqualTo(1);
+
+        // remove element of partition '2024-01-03' and '2024-01-01'
+        // test refresh job only fresh partition '2024-01-03'
+        removePartitionValue(data, "2024-01-01");
+        removePartitionValue(data, "2024-01-03");
+        // refresh the materialized with period schedule
+        startTime = System.currentTimeMillis();
+        OperationHandle periodRefreshTableHandle =
+                service.refreshMaterializedTable(
+                        sessionHandle,
+                        objectIdentifier.asSerializableString(),
+                        true,
+                        "2024-01-03 00:00:00",
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+
+        awaitOperationTermination(service, sessionHandle, 
periodRefreshTableHandle);
+        List<RowData> periodRefreshResult =
+                fetchAllResults(service, sessionHandle, 
periodRefreshTableHandle);
+        assertThat(periodRefreshResult.size()).isEqualTo(1);
+        String periodJobId = 
periodRefreshResult.get(0).getString(0).toString();
+
+        // 1. verify fresh job created
+        verifyRefreshJobCreated(restClusterClient, periodJobId, startTime);
+
+        // 2. verify the new job overwrite the data
+        assertThat(
+                        fetchTableData(
+                                        sessionHandle,
+                                        "SELECT * FROM my_materialized_table 
where ds = '2024-01-03'")
+                                .size())
+                .isEqualTo(getPartitionSize(data, "2024-01-03"));
+        // verify the data of partition '2024-01-01' is not changed
+        assertThat(
+                        fetchTableData(
+                                        sessionHandle,
+                                        "SELECT * FROM my_materialized_table 
where ds = '2024-01-01'")
+                                .size())
+                .isNotEqualTo(getPartitionSize(data, "2024-01-01"));
+
+        // refresh the materialized table with schedule time not specified
+        OperationHandle invalidRefreshTableHandle1 =
+                service.refreshMaterializedTable(
+                        sessionHandle,
+                        objectIdentifier.asSerializableString(),
+                        true,
+                        null,
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service, sessionHandle, 
invalidRefreshTableHandle1))
+                .rootCause()
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        String.format(
+                                "Scheduler time not properly set for periodic 
refresh of table %s",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "my_materialized_table")
+                                        .asSerializableString()));
+
+        // refresh the materialized table with invalid schedule time
+        String invalidTime = "20240103 00:00:00.000";
+        OperationHandle invalidRefreshTableHandle2 =
+                service.refreshMaterializedTable(
+                        sessionHandle,
+                        objectIdentifier.asSerializableString(),
+                        true,
+                        invalidTime,
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service, sessionHandle, 
invalidRefreshTableHandle2))
+                .rootCause()
+                .isInstanceOf(SqlExecutionException.class)
+                .hasMessage(
+                        String.format(
+                                "Failed to parse a valid partition value for 
the field 'ds' in table %s using the scheduler time '20240103 00:00:00.000' 
based on the date format 'yyyy-MM-dd HH:mm:ss'.",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "my_materialized_table")
+                                        .asSerializableString()));
     }
 
-    private List<RowData> fetchTableData(SessionHandle sessionHandle, String 
query) {
-        OperationHandle queryHandle =
-                service.executeStatement(sessionHandle, query, -1, new 
Configuration());
+    private void removePartitionValue(List<Row> data, String partition) {

Review Comment:
   This has already been removed and replaced with adding data corresponding to 
the partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to