ruanhang1993 commented on code in PR #23507: URL: https://github.com/apache/flink/pull/23507#discussion_r1405905020
########## flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java: ########## @@ -190,4 +231,76 @@ private static List<String> readResultFiles(Path path) throws IOException { } return result; } + + protected List<String> formatRawResult(List<String> rawResults) { Review Comment: Add some docs for this method on how to use or override it. ########## flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java: ########## @@ -190,4 +231,76 @@ private static List<String> readResultFiles(Path path) throws IOException { } return result; } + + protected List<String> formatRawResult(List<String> rawResults) { + return rawResults; + } + + protected static List<String> convertToMaterializedResult( + List<String> rawResults, + ResolvedSchema schema, + DeserializationSchema<RowData> deserializationSchema) { + DataCollector collector = new DataCollector(); + try { + deserializationSchema.open(new TestingDeserializationContext()); + for (String rawResult : rawResults) { + deserializationSchema.deserialize(rawResult.getBytes(), collector); + } + } catch (Exception e) { + fail("deserialize error: ", e); + } + + RowRowConverter converter = RowRowConverter.create(schema.toPhysicalRowDataType()); + Map<Row, Row> upsertResult = new HashMap<>(); + + for (RowData rowData : collector.dataList) { + RowKind kind = rowData.getRowKind(); + + Row row = converter.toExternal(rowData); + assertThat(row).isNotNull(); + + Row key = Row.project(row, schema.getPrimaryKeyIndexes()); + key.setKind(RowKind.INSERT); + + Row upsertRow = Row.copy(row); + upsertRow.setKind(RowKind.INSERT); + + if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) { + upsertResult.put(key, upsertRow); + } else { + Row oldValue = upsertResult.remove(key); + if (oldValue == null) { + throw new RuntimeException( + "Tried to delete a value that wasn't inserted first. " + + "This is probably an incorrectly implemented test."); + } + } + } + + return upsertResult.values().stream().map(Row::toString).collect(Collectors.toList()); + } + + protected static DebeziumJsonDeserializationSchema createDebeziumDeserializationSchema( + ResolvedSchema schema) { + return new DebeziumJsonDeserializationSchema( + schema.toPhysicalRowDataType(), + Collections.emptyList(), + InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()), + false, + true, + TimestampFormat.ISO_8601); + } + Review Comment: add some java docs ########## flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java: ########## @@ -190,4 +231,76 @@ private static List<String> readResultFiles(Path path) throws IOException { } return result; } + + protected List<String> formatRawResult(List<String> rawResults) { + return rawResults; + } + + protected static List<String> convertToMaterializedResult( + List<String> rawResults, + ResolvedSchema schema, + DeserializationSchema<RowData> deserializationSchema) { + DataCollector collector = new DataCollector(); + try { + deserializationSchema.open(new TestingDeserializationContext()); + for (String rawResult : rawResults) { + deserializationSchema.deserialize(rawResult.getBytes(), collector); + } + } catch (Exception e) { + fail("deserialize error: ", e); + } + + RowRowConverter converter = RowRowConverter.create(schema.toPhysicalRowDataType()); + Map<Row, Row> upsertResult = new HashMap<>(); + + for (RowData rowData : collector.dataList) { + RowKind kind = rowData.getRowKind(); + + Row row = converter.toExternal(rowData); + assertThat(row).isNotNull(); + + Row key = Row.project(row, schema.getPrimaryKeyIndexes()); + key.setKind(RowKind.INSERT); + + Row upsertRow = Row.copy(row); + upsertRow.setKind(RowKind.INSERT); Review Comment: `upsertRow` is only useful when the kind is insert or update_after. So we could create it when `kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org