yuxiqian commented on PR #3974: URL: https://github.com/apache/flink-cdc/pull/3974#issuecomment-2765783565
Thanks for @lvyanquan's nice work! I've prepared a simple composer ITcase to cover this scenario, could you please integrate it and check if it works? ```java // In `FlinkPipelineTransformITCase.java` @Test void testShadeColumns() throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); Configuration sourceConfig = new Configuration(); sourceConfig.set( ValuesDataSourceOptions.EVENT_SET_ID, ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); SourceDef sourceDef = new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); TableId tableId1 = TableId.tableId("ns", "sm", "tbl1"); TableId tableId2 = TableId.tableId("ns", "sm", "tbl2"); List<Event> events = generateSchemaEvolutionEvents(tableId1); events.addAll(generateSchemaEvolutionEvents(tableId2)); ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); Configuration sinkConfig = new Configuration(); SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); pipelineConfig.set( PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, sinkDef, Collections.emptyList(), Arrays.asList( new TransformDef( "ns.sm.tbl1", "*, 42 AS name", null, null, null, null, "Schema evolution of a shaded column", null), new TransformDef( "ns.sm.tbl2", "*, CAST(id AS DOUBLE) AS id", "1.9 < id AND id < 2.1", null, null, null, "Shade a column based on its original value", null)), Collections.emptyList(), pipelineConfig); PipelineExecution execution = composer.compose(pipelineDef); execution.execute(); String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents).containsExactly(...); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org