ruanhang1993 commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r2032935652


##########
docs/content.zh/docs/core-concept/data-pipeline.md:
##########
@@ -115,4 +115,5 @@ under the License.
 |-----------------|---------------------------------------|-------------------|
 | name            | 这个 pipeline 的名称,会用在 Flink 集群中作为作业的名称。 | optional          |
 | parallelism     | pipeline的全局并发度,默认值是1。                 | optional          |
-| local-time-zone | 作业级别的本地时区。                            | optional          |
\ No newline at end of file
+| local-time-zone | 作业级别的本地时区。                            | optional          |
+| batch-mode.enabled | 仅使用批处理模式来同步当前快照数据。                    | optional        
  |

Review Comment:
   I think it is better to use a option `runtime-mode` in enum type with a 
default value `STREAMING`, like `RuntimeExecutionMode` in flink.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java:
##########
@@ -95,4 +96,35 @@ private TableId resolveReplacement(
         }
         return TableId.parse(route.f1);
     }
+
+    /*
+     * groupSourceTablesByRouteRule
+     *
+     * @param: tableIdSet
+     * @return: 
java.util.List<java.util.Set<org.apache.flink.cdc.common.event.TableId>>
+     *
+     * Group the source tables that conform to the same routing rule together.
+     * The total number of groups is less than or equal to the number of 
routing rules.
+     * For the source tables within each group, their table structures will be 
merged to obtain
+     * the widest table structure in that group. The structures of all tables 
within the group
+     * will be expanded to this widest table structure.
+     */

Review Comment:
   ```suggestion
       /**
        * Group the source tables that conform to the same routing rule 
together.
        * The total number of groups is less than or equal to the number of 
routing rules.
        * For the source tables within each group, their table structures will 
be merged to obtain
        * the widest table structure in that group. The structures of all 
tables within the group
        * will be expanded to this widest table structure.
        *
        * @param tableIdSet The tables need to be grouped by the router
        * @return The tables grouped by the router 
        */
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaManager.java:
##########
@@ -73,6 +73,12 @@ public SchemaManager() {
         behavior = SchemaChangeBehavior.EVOLVE;

Review Comment:
   ```suggestion
           this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), 
SchemaChangeBehavior.EVOLVE);
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkWriterOperator.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import 
org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+
+/**
+ * An operator that processes records to be written into a {@link Sink} in 
batch mode.
+ *
+ * <p>The operator is a proxy of SinkWriterOperator in Flink.
+ *
+ * <p>The operator is always part of a sink pipeline and is the first operator.
+ *
+ * @param <CommT> the type of the committable (to send to downstream operators)
+ */
+@Internal
+public class DataBatchSinkWriterOperator<CommT>

Review Comment:
   ```suggestion
   public class BatchDataSinkWriterOperator<CommT>
   ```



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/DataSourceFactory.java:
##########
@@ -28,4 +28,10 @@ public interface DataSourceFactory extends Factory {
 
     /** Creates a {@link DataSource} instance. */
     DataSource createDataSource(Context context);
+
+    /** Checking if this {@link DataSource} could be created in batch mode. */
+    default void verifyBatchMode(Context context) {

Review Comment:
   ```suggestion
       default void verifyRuntimeMode(Context context, RuntimeMode runtimeMode) 
{
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaManager.java:
##########
@@ -73,6 +73,12 @@ public SchemaManager() {
         behavior = SchemaChangeBehavior.EVOLVE;
     }
 
+    public SchemaManager(SchemaChangeBehavior schemaChangeBehavior) {
+        evolvedSchemas = new ConcurrentHashMap<>();
+        originalSchemas = new ConcurrentHashMap<>();
+        behavior = schemaChangeBehavior;

Review Comment:
   ```suggestion
           this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), 
schemaChangeBehavior)
   ```



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -45,6 +45,12 @@ public class PipelineOptions {
                     .defaultValue(1)
                     .withDescription("Parallelism of the pipeline");
 
+    public static final ConfigOption<Boolean> PIPELINE_BATCH_MODE_ENABLED =
+            ConfigOptions.key("batch-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Run pipeline job in batch mode instead 
of streaming mode");

Review Comment:
   ```java
   public static final ConfigOption<Boolean> PIPELINE_RUNTIME_MODE =
               ConfigOptions.key("runtime-mode")
                       .enumType(RuntimeMode.class)
                       .defaultValue(STREAMING)
                       .withDescription("xxx");
   ```
   Maybe we could reuse the flink `RuntimeExecutionMode` instead of introducing 
a new enum.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionBatchOperator.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.partitioning;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Operator for processing events from {@link SchemaOperator} before {@link 
EventPartitioner} with
+ * regular topology in batch mode.
+ */
+@Internal
+public class RegularPrePartitionBatchOperator extends 
AbstractStreamOperator<PartitioningEvent>

Review Comment:
   ```suggestion
   public class BatchRegularPrePartitionOperator extends 
AbstractStreamOperator<PartitioningEvent>
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkFunctionOperator.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.sink;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import 
org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * An operator that processes records to be written into a {@link 
SinkFunction} in batch mode.
+ *
+ * <p>The operator is a proxy of {@link StreamSink} in Flink.
+ *
+ * <p>The operator is always part of a sink pipeline and is the first operator.
+ */
+@Internal
+public class DataBatchSinkFunctionOperator extends StreamSink<Event> {

Review Comment:
   ```suggestion
   public class BatchDataSinkFunctionOperator extends StreamSink<Event> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to