[ https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851955#comment-17851955 ]
Qingsheng Ren commented on FLINK-35149: --------------------------------------- flink-cdc master: 33891869a9fffa2abf8b8ae03915d0ddccdaf5ec > Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not > TwoPhaseCommittingSink > --------------------------------------------------------------------------------------- > > Key: FLINK-35149 > URL: https://issues.apache.org/jira/browse/FLINK-35149 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.0 > Reporter: Hongshun Wang > Assignee: Hongshun Wang > Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > Current , when sink is not instanceof TwoPhaseCommittingSink, use > input.transform rather than stream. It means that pre-write topology will be > ignored. > {code:java} > private void sinkTo( > DataStream<Event> input, > Sink<Event> sink, > String sinkName, > OperatorID schemaOperatorID) { > DataStream<Event> stream = input; > // Pre write topology > if (sink instanceof WithPreWriteTopology) { > stream = ((WithPreWriteTopology<Event>) > sink).addPreWriteTopology(stream); > } > if (sink instanceof TwoPhaseCommittingSink) { > addCommittingTopology(sink, stream, sinkName, schemaOperatorID); > } else { > input.transform( > SINK_WRITER_PREFIX + sinkName, > CommittableMessageTypeInfo.noOutput(), > new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)