ruanhang1993 commented on code in PR #3812: URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r2032935652
########## docs/content.zh/docs/core-concept/data-pipeline.md: ########## @@ -115,4 +115,5 @@ under the License. |-----------------|---------------------------------------|-------------------| | name | 这个 pipeline 的名称,会用在 Flink 集群中作为作业的名称。 | optional | | parallelism | pipeline的全局并发度,默认值是1。 | optional | -| local-time-zone | 作业级别的本地时区。 | optional | \ No newline at end of file +| local-time-zone | 作业级别的本地时区。 | optional | +| batch-mode.enabled | 仅使用批处理模式来同步当前快照数据。 | optional | Review Comment: I think it is better to use a option `runtime-mode` in enum type with a default value `STREAMING`, like `RuntimeExecutionMode` in flink. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java: ########## @@ -95,4 +96,35 @@ private TableId resolveReplacement( } return TableId.parse(route.f1); } + + /* + * groupSourceTablesByRouteRule + * + * @param: tableIdSet + * @return: java.util.List<java.util.Set<org.apache.flink.cdc.common.event.TableId>> + * + * Group the source tables that conform to the same routing rule together. + * The total number of groups is less than or equal to the number of routing rules. + * For the source tables within each group, their table structures will be merged to obtain + * the widest table structure in that group. The structures of all tables within the group + * will be expanded to this widest table structure. + */ Review Comment: ```suggestion /** * Group the source tables that conform to the same routing rule together. * The total number of groups is less than or equal to the number of routing rules. * For the source tables within each group, their table structures will be merged to obtain * the widest table structure in that group. The structures of all tables within the group * will be expanded to this widest table structure. * * @param tableIdSet The tables need to be grouped by the router * @return The tables grouped by the router */ ``` ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaManager.java: ########## @@ -73,6 +73,12 @@ public SchemaManager() { behavior = SchemaChangeBehavior.EVOLVE; Review Comment: ```suggestion this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), SchemaChangeBehavior.EVOLVE); ``` ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkWriterOperator.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.sink; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; + +/** + * An operator that processes records to be written into a {@link Sink} in batch mode. + * + * <p>The operator is a proxy of SinkWriterOperator in Flink. + * + * <p>The operator is always part of a sink pipeline and is the first operator. + * + * @param <CommT> the type of the committable (to send to downstream operators) + */ +@Internal +public class DataBatchSinkWriterOperator<CommT> Review Comment: ```suggestion public class BatchDataSinkWriterOperator<CommT> ``` ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/DataSourceFactory.java: ########## @@ -28,4 +28,10 @@ public interface DataSourceFactory extends Factory { /** Creates a {@link DataSource} instance. */ DataSource createDataSource(Context context); + + /** Checking if this {@link DataSource} could be created in batch mode. */ + default void verifyBatchMode(Context context) { Review Comment: ```suggestion default void verifyRuntimeMode(Context context, RuntimeMode runtimeMode) { ``` ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaManager.java: ########## @@ -73,6 +73,12 @@ public SchemaManager() { behavior = SchemaChangeBehavior.EVOLVE; } + public SchemaManager(SchemaChangeBehavior schemaChangeBehavior) { + evolvedSchemas = new ConcurrentHashMap<>(); + originalSchemas = new ConcurrentHashMap<>(); + behavior = schemaChangeBehavior; Review Comment: ```suggestion this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), schemaChangeBehavior) ``` ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java: ########## @@ -45,6 +45,12 @@ public class PipelineOptions { .defaultValue(1) .withDescription("Parallelism of the pipeline"); + public static final ConfigOption<Boolean> PIPELINE_BATCH_MODE_ENABLED = + ConfigOptions.key("batch-mode.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Run pipeline job in batch mode instead of streaming mode"); Review Comment: ```java public static final ConfigOption<Boolean> PIPELINE_RUNTIME_MODE = ConfigOptions.key("runtime-mode") .enumType(RuntimeMode.class) .defaultValue(STREAMING) .withDescription("xxx"); ``` Maybe we could reuse the flink `RuntimeExecutionMode` instead of introducing a new enum. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionBatchOperator.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.partitioning; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.function.HashFunction; +import org.apache.flink.cdc.common.function.HashFunctionProvider; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; +import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Operator for processing events from {@link SchemaOperator} before {@link EventPartitioner} with + * regular topology in batch mode. + */ +@Internal +public class RegularPrePartitionBatchOperator extends AbstractStreamOperator<PartitioningEvent> Review Comment: ```suggestion public class BatchRegularPrePartitionOperator extends AbstractStreamOperator<PartitioningEvent> ``` ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkFunctionOperator.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.sink; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * An operator that processes records to be written into a {@link SinkFunction} in batch mode. + * + * <p>The operator is a proxy of {@link StreamSink} in Flink. + * + * <p>The operator is always part of a sink pipeline and is the first operator. + */ +@Internal +public class DataBatchSinkFunctionOperator extends StreamSink<Event> { Review Comment: ```suggestion public class BatchDataSinkFunctionOperator extends StreamSink<Event> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org