FuYouJ commented on code in PR #4841:
URL: https://github.com/apache/seatunnel/pull/4841#discussion_r1218996318


##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java:
##########
@@ -65,22 +68,78 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
 
-        final CheckResult queryConfigCheck =
+        // check username password query and init driver
+        DriverBuilder driverBuilder = prepareDriver(config);
+        neo4JSinkQueryInfo.setDriverBuilder(driverBuilder);
+        setNeo4jWriteMode(config);
+
+        if (neo4JSinkQueryInfo.batchMode()) {
+            prepareBatchModeConfigParams(config);
+        } else {
+            prepareWriteOneByOneConfigParams(config);
+        }
+    }
+
+    private void setNeo4jWriteMode(Config config) {
+        if (config.hasPath(MAX_BATCH_SIZE.key())) {
+            int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+            if (batchSize <= 0) {
+                throw new Neo4jConnectorException(
+                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                        String.format(
+                                "PluginName: %s, PluginType: %s, Message: %s",
+                                PLUGIN_NAME, PluginType.SINK, "maxBatchSize 
must greater than 0"));
+            }
+            neo4JSinkQueryInfo.setMaxBatchSize(batchSize);
+            neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.Batch);
+        } else {
+            neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.OneByOne);
+        }
+    }
+
+    private void prepareWriteOneByOneConfigParams(Config config) {
+
+        CheckResult queryConfigCheck =
                 CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(), 
QUERY_PARAM_POSITION.key());
+
         if (!queryConfigCheck.isSuccess()) {
             throw new Neo4jConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
                             "PluginName: %s, PluginType: %s, Message: %s",
                             PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg()));
         }
+        // set query
         neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
+        // set queryParamPosition
         neo4JSinkQueryInfo.setQueryParamPosition(
                 config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
     }
 
+    private void prepareBatchModeConfigParams(Config config) {
+
+        CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, 
KEY_QUERY.key());
+        if (!queryConfigCheck.isSuccess()) {
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg()));
+        }
+
+        int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+        neo4JSinkQueryInfo.setMaxBatchSize(batchSize);

Review Comment:
   according to your opinion, the code has been reconstructed, please review 
again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to