ruanhang1993 commented on code in PR #23507:
URL: https://github.com/apache/flink/pull/23507#discussion_r1405905020


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java:
##########
@@ -190,4 +231,76 @@ private static List<String> readResultFiles(Path path) 
throws IOException {
         }
         return result;
     }
+
+    protected List<String> formatRawResult(List<String> rawResults) {

Review Comment:
   Add some docs for this method on how to use or override it.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java:
##########
@@ -190,4 +231,76 @@ private static List<String> readResultFiles(Path path) 
throws IOException {
         }
         return result;
     }
+
+    protected List<String> formatRawResult(List<String> rawResults) {
+        return rawResults;
+    }
+
+    protected static List<String> convertToMaterializedResult(
+            List<String> rawResults,
+            ResolvedSchema schema,
+            DeserializationSchema<RowData> deserializationSchema) {
+        DataCollector collector = new DataCollector();
+        try {
+            deserializationSchema.open(new TestingDeserializationContext());
+            for (String rawResult : rawResults) {
+                deserializationSchema.deserialize(rawResult.getBytes(), 
collector);
+            }
+        } catch (Exception e) {
+            fail("deserialize error: ", e);
+        }
+
+        RowRowConverter converter = 
RowRowConverter.create(schema.toPhysicalRowDataType());
+        Map<Row, Row> upsertResult = new HashMap<>();
+
+        for (RowData rowData : collector.dataList) {
+            RowKind kind = rowData.getRowKind();
+
+            Row row = converter.toExternal(rowData);
+            assertThat(row).isNotNull();
+
+            Row key = Row.project(row, schema.getPrimaryKeyIndexes());
+            key.setKind(RowKind.INSERT);
+
+            Row upsertRow = Row.copy(row);
+            upsertRow.setKind(RowKind.INSERT);
+
+            if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+                upsertResult.put(key, upsertRow);
+            } else {
+                Row oldValue = upsertResult.remove(key);
+                if (oldValue == null) {
+                    throw new RuntimeException(
+                            "Tried to delete a value that wasn't inserted 
first. "
+                                    + "This is probably an incorrectly 
implemented test.");
+                }
+            }
+        }
+
+        return 
upsertResult.values().stream().map(Row::toString).collect(Collectors.toList());
+    }
+
+    protected static DebeziumJsonDeserializationSchema 
createDebeziumDeserializationSchema(
+            ResolvedSchema schema) {
+        return new DebeziumJsonDeserializationSchema(
+                schema.toPhysicalRowDataType(),
+                Collections.emptyList(),
+                
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()),
+                false,
+                true,
+                TimestampFormat.ISO_8601);
+    }
+

Review Comment:
   add some java docs



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java:
##########
@@ -190,4 +231,76 @@ private static List<String> readResultFiles(Path path) 
throws IOException {
         }
         return result;
     }
+
+    protected List<String> formatRawResult(List<String> rawResults) {
+        return rawResults;
+    }
+
+    protected static List<String> convertToMaterializedResult(
+            List<String> rawResults,
+            ResolvedSchema schema,
+            DeserializationSchema<RowData> deserializationSchema) {
+        DataCollector collector = new DataCollector();
+        try {
+            deserializationSchema.open(new TestingDeserializationContext());
+            for (String rawResult : rawResults) {
+                deserializationSchema.deserialize(rawResult.getBytes(), 
collector);
+            }
+        } catch (Exception e) {
+            fail("deserialize error: ", e);
+        }
+
+        RowRowConverter converter = 
RowRowConverter.create(schema.toPhysicalRowDataType());
+        Map<Row, Row> upsertResult = new HashMap<>();
+
+        for (RowData rowData : collector.dataList) {
+            RowKind kind = rowData.getRowKind();
+
+            Row row = converter.toExternal(rowData);
+            assertThat(row).isNotNull();
+
+            Row key = Row.project(row, schema.getPrimaryKeyIndexes());
+            key.setKind(RowKind.INSERT);
+
+            Row upsertRow = Row.copy(row);
+            upsertRow.setKind(RowKind.INSERT);

Review Comment:
   `upsertRow` is only useful when the kind is insert or update_after. So we 
could create it when `kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to