yuxiqian commented on code in PR #4022: URL: https://github.com/apache/flink-cdc/pull/4022#discussion_r2100077618
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java: ########## @@ -530,4 +530,55 @@ void testAddColumnWithPosition(String metastore) Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType()) .isEqualTo(tableSchema); } + + @ParameterizedTest + @ValueSource(strings = {"filesystem", "hive"}) + public void testCreateTableWithComment(String metastore) + throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, SchemaEvolveException { + initialize(metastore); + Map<String, String> tableOptions = new HashMap<>(); + tableOptions.put("bucket", "-1"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.table_with_comment"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull(), + "comment of col1") + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col2") + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col3") + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col4") + .comment("comment of table_with_comment") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString("test.table_with_comment")); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField( + 0, "col1", DataTypes.STRING().notNull(), "comment of col1"), + new DataField(1, "col2", DataTypes.STRING(), "comment of col2"), + new DataField(2, "col3", DataTypes.STRING(), "comment of col3"), + new DataField(3, "col4", DataTypes.STRING(), "comment of col4"))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEmpty(); + Assertions.assertThat(table.partitionKeys()).isEmpty(); + Assertions.assertThat(table.options()).containsEntry("bucket", "-1"); + Assertions.assertThat(table.comment().orElse("")) + .isEqualTo("comment of table_with_comment"); Review Comment: nit: could be simplified to `assertThat(table.comment()).contains(...)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org