TyrantLucifer commented on code in PR #4841:
URL: https://github.com/apache/seatunnel/pull/4841#discussion_r1218013456


##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java:
##########
@@ -29,4 +29,17 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
                     .noDefaultValue()
                     .withDescription(
                             "position mapping information for query 
parameters. key name is parameter placeholder name. associated value is 
position of field in input data row.");
+
+    public static final Option<Integer> MAX_BATCH_SIZE =
+            Options.key("maxBatchSize")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("neo4j write max batchSize");
+
+    public static final Option<String> BATCH_VARIABLE =
+            Options.key("batchVariable")

Review Comment:
   ```suggestion
               Options.key("batch_variable")
   ```



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java:
##########
@@ -29,4 +29,17 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
                     .noDefaultValue()
                     .withDescription(
                             "position mapping information for query 
parameters. key name is parameter placeholder name. associated value is 
position of field in input data row.");
+
+    public static final Option<Integer> MAX_BATCH_SIZE =
+            Options.key("maxBatchSize")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("neo4j write max batchSize");
+
+    public static final Option<String> BATCH_VARIABLE =
+            Options.key("batchVariable")
+                    .stringType()

Review Comment:
   ```suggestion
                       .enumType()
   ```



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java:
##########
@@ -65,22 +68,78 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
 
-        final CheckResult queryConfigCheck =
+        // check username password query and init driver
+        DriverBuilder driverBuilder = prepareDriver(config);
+        neo4JSinkQueryInfo.setDriverBuilder(driverBuilder);
+        setNeo4jWriteMode(config);
+
+        if (neo4JSinkQueryInfo.batchMode()) {
+            prepareBatchModeConfigParams(config);
+        } else {
+            prepareWriteOneByOneConfigParams(config);
+        }
+    }
+
+    private void setNeo4jWriteMode(Config config) {

Review Comment:
   I think add a temp variable save the write mode value is better.



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java:
##########
@@ -29,4 +29,17 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
                     .noDefaultValue()
                     .withDescription(
                             "position mapping information for query 
parameters. key name is parameter placeholder name. associated value is 
position of field in input data row.");
+
+    public static final Option<Integer> MAX_BATCH_SIZE =
+            Options.key("maxBatchSize")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("neo4j write max batchSize");
+
+    public static final Option<String> BATCH_VARIABLE =

Review Comment:
   ```suggestion
       public static final Option<String> WRITE_MODE =
   ```



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java:
##########
@@ -65,22 +68,78 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
 
-        final CheckResult queryConfigCheck =
+        // check username password query and init driver
+        DriverBuilder driverBuilder = prepareDriver(config);
+        neo4JSinkQueryInfo.setDriverBuilder(driverBuilder);
+        setNeo4jWriteMode(config);
+
+        if (neo4JSinkQueryInfo.batchMode()) {
+            prepareBatchModeConfigParams(config);
+        } else {
+            prepareWriteOneByOneConfigParams(config);
+        }
+    }
+
+    private void setNeo4jWriteMode(Config config) {
+        if (config.hasPath(MAX_BATCH_SIZE.key())) {
+            int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+            if (batchSize <= 0) {
+                throw new Neo4jConnectorException(
+                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                        String.format(
+                                "PluginName: %s, PluginType: %s, Message: %s",
+                                PLUGIN_NAME, PluginType.SINK, "maxBatchSize 
must greater than 0"));
+            }
+            neo4JSinkQueryInfo.setMaxBatchSize(batchSize);
+            neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.Batch);
+        } else {
+            neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.OneByOne);
+        }
+    }
+
+    private void prepareWriteOneByOneConfigParams(Config config) {
+
+        CheckResult queryConfigCheck =
                 CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(), 
QUERY_PARAM_POSITION.key());
+
         if (!queryConfigCheck.isSuccess()) {
             throw new Neo4jConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
                             "PluginName: %s, PluginType: %s, Message: %s",
                             PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg()));
         }
+        // set query
         neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
+        // set queryParamPosition
         neo4JSinkQueryInfo.setQueryParamPosition(
                 config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
     }
 
+    private void prepareBatchModeConfigParams(Config config) {
+
+        CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, 
KEY_QUERY.key());
+        if (!queryConfigCheck.isSuccess()) {
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg()));
+        }
+
+        int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+        neo4JSinkQueryInfo.setMaxBatchSize(batchSize);

Review Comment:
   This code snippets can be extracted into neo4JSinkQueryInfo, for more 
details you can refer to HttpParamters



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java:
##########
@@ -29,4 +29,17 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
                     .noDefaultValue()
                     .withDescription(
                             "position mapping information for query 
parameters. key name is parameter placeholder name. associated value is 
position of field in input data row.");
+
+    public static final Option<Integer> MAX_BATCH_SIZE =
+            Options.key("maxBatchSize")

Review Comment:
   ```suggestion
               Options.key("max_batch_size")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to