FuYouJ commented on code in PR #4841:
URL: https://github.com/apache/seatunnel/pull/4841#discussion_r1218996318
##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java:
##########
@@ -65,22 +68,78 @@ public String getPluginName() {
@Override
public void prepare(Config config) throws PrepareFailException {
- neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
- final CheckResult queryConfigCheck =
+ // check username password query and init driver
+ DriverBuilder driverBuilder = prepareDriver(config);
+ neo4JSinkQueryInfo.setDriverBuilder(driverBuilder);
+ setNeo4jWriteMode(config);
+
+ if (neo4JSinkQueryInfo.batchMode()) {
+ prepareBatchModeConfigParams(config);
+ } else {
+ prepareWriteOneByOneConfigParams(config);
+ }
+ }
+
+ private void setNeo4jWriteMode(Config config) {
+ if (config.hasPath(MAX_BATCH_SIZE.key())) {
+ int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+ if (batchSize <= 0) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK, "maxBatchSize
must greater than 0"));
+ }
+ neo4JSinkQueryInfo.setMaxBatchSize(batchSize);
+ neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.Batch);
+ } else {
+ neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.OneByOne);
+ }
+ }
+
+ private void prepareWriteOneByOneConfigParams(Config config) {
+
+ CheckResult queryConfigCheck =
CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(),
QUERY_PARAM_POSITION.key());
+
if (!queryConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, PluginType.SINK,
queryConfigCheck.getMsg()));
}
+ // set query
neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
+ // set queryParamPosition
neo4JSinkQueryInfo.setQueryParamPosition(
config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
}
+ private void prepareBatchModeConfigParams(Config config) {
+
+ CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config,
KEY_QUERY.key());
+ if (!queryConfigCheck.isSuccess()) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK,
queryConfigCheck.getMsg()));
+ }
+
+ int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+ neo4JSinkQueryInfo.setMaxBatchSize(batchSize);
Review Comment:
according to your opinion, the code has been reconstructed, please review
again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]