yuxiqian commented on PR #3974:
URL: https://github.com/apache/flink-cdc/pull/3974#issuecomment-2765783565

   Thanks for @lvyanquan's nice work! I've prepared a simple composer ITcase to 
cover this scenario, could you please integrate it and check if it works?
   
   ```java
   // In `FlinkPipelineTransformITCase.java`
   
   @Test
   void testShadeColumns() throws Exception {
       FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
       Configuration sourceConfig = new Configuration();
       sourceConfig.set(
               ValuesDataSourceOptions.EVENT_SET_ID,
               ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
       SourceDef sourceDef =
               new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
       TableId tableId1 = TableId.tableId("ns", "sm", "tbl1");
       TableId tableId2 = TableId.tableId("ns", "sm", "tbl2");
       List<Event> events = generateSchemaEvolutionEvents(tableId1);
       events.addAll(generateSchemaEvolutionEvents(tableId2));
       
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
       Configuration sinkConfig = new Configuration();
       SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
       Configuration pipelineConfig = new Configuration();
       pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
       pipelineConfig.set(
               PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
       PipelineDef pipelineDef =
               new PipelineDef(
                       sourceDef,
                       sinkDef,
                       Collections.emptyList(),
                       Arrays.asList(
                               new TransformDef(
                                       "ns.sm.tbl1",
                                       "*, 42 AS name",
                                       null,
                                       null,
                                       null,
                                       null,
                                       "Schema evolution of a shaded column",
                                       null),
                               new TransformDef(
                                       "ns.sm.tbl2",
                                       "*, CAST(id AS DOUBLE) AS id",
                                       "1.9 < id AND id < 2.1",
                                       null,
                                       null,
                                       null,
                                       "Shade a column based on its original 
value",
                                       null)),
                       Collections.emptyList(),
                       pipelineConfig);
       PipelineExecution execution = composer.compose(pipelineDef);
       execution.execute();
       String[] outputEvents = outCaptor.toString().trim().split("\n");
       assertThat(outputEvents).containsExactly(...);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to