This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 170d49f3363 Add PrepareJobWithGetBinlogPositionException (#21040)
170d49f3363 is described below

commit 170d49f33635001dd46465ca9d7fe8f4e899caeb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 18 00:57:35 2022 +0800

    Add PrepareJobWithGetBinlogPositionException (#21040)
---
 .../user-manual/error-code/sql-error-code.cn.md    |  5 ++--
 .../user-manual/error-code/sql-error-code.en.md    |  5 ++--
 .../BinlogSyncChannelAlreadyClosedException.java   |  2 +-
 .../job/PipelineJobExecutionException.java         |  2 +-
 ... PrepareJobWithGetBinlogPositionException.java} | 12 +++++----
 .../scenario/migration/MigrationJobPreparer.java   | 30 +++++++++++-----------
 6 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 696a8d68f2f..800e627a87e 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -103,8 +103,9 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | 08000     | 18088       | Check privileges failed on source data source, 
reason is: %s |
 | 08000     | 18089       | Data sources can not connect, reason is: %s |
 | HY000     | 18090       | Importer job write data failed |
-| HY000     | 18091       | Can not poll event because of binlog sync channel 
already closed |
-| HY000     | 18092       | Task \`%s\` execute failed |
+| 08000     | 18091       | Get binlog position failed by job \`%s\`, reason 
is: %s |
+| HY000     | 18093       | Can not poll event because of binlog sync channel 
already closed |
+| HY000     | 18094       | Task \`%s\` execute failed |
 
 ## 功能异常
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 34e7a05e089..836ac81e0ab 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -103,8 +103,9 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 | 08000     | 18088       | Check privileges failed on source data source, 
reason is: %s |
 | 08000     | 18089       | Data sources can not connect, reason is: %s |
 | HY000     | 18090       | Importer job write data failed |
-| HY000     | 18091       | Can not poll event because of binlog sync channel 
already closed |
-| HY000     | 18092       | Task \`%s\` execute failed |
+| 08000     | 18091       | Get binlog position failed by job \`%s\`, reason 
is: %s |
+| HY000     | 18093       | Can not poll event because of binlog sync channel 
already closed |
+| HY000     | 18094       | Task \`%s\` execute failed |
 
 ## Feature Exception
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
index 284256a42e6..119eb138436 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
@@ -28,6 +28,6 @@ public final class BinlogSyncChannelAlreadyClosedException 
extends PipelineSQLEx
     private static final long serialVersionUID = -8897293295641185703L;
     
     public BinlogSyncChannelAlreadyClosedException() {
-        super(XOpenSQLState.GENERAL_ERROR, 91, "Can not poll event because of 
binlog sync channel already closed");
+        super(XOpenSQLState.GENERAL_ERROR, 93, "Can not poll event because of 
binlog sync channel already closed");
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
index 13d2df77bb5..a73e812ac52 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
@@ -28,6 +28,6 @@ public final class PipelineJobExecutionException extends 
PipelineSQLException {
     private static final long serialVersionUID = -5530453461378051166L;
     
     public PipelineJobExecutionException(final String taskId, final Throwable 
cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 92, "Task `%s` execute failed", 
taskId, cause.getMessage());
+        super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", 
taskId, cause.getMessage());
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
similarity index 69%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
index 284256a42e6..7eb4bcb73f7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
@@ -20,14 +20,16 @@ package 
org.apache.shardingsphere.data.pipeline.core.exception.job;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
+import java.sql.SQLException;
+
 /**
- * Binlog sync channel already closed exception.
+ * Prepare job with get binlog position.
  */
-public final class BinlogSyncChannelAlreadyClosedException extends 
PipelineSQLException {
+public final class PrepareJobWithGetBinlogPositionException extends 
PipelineSQLException {
     
-    private static final long serialVersionUID = -8897293295641185703L;
+    private static final long serialVersionUID = -3701189611685636704L;
     
-    public BinlogSyncChannelAlreadyClosedException() {
-        super(XOpenSQLState.GENERAL_ERROR, 91, "Can not poll event because of 
binlog sync channel already closed");
+    public PrepareJobWithGetBinlogPositionException(final String jobId, final 
SQLException cause) {
+        super(XOpenSQLState.CONNECTION_EXCEPTION, 90, "Get binlog position 
failed by job `%s`, reason is: %s", jobId, cause.getMessage());
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 48071fee418..93776d3ae2e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -29,7 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
@@ -71,20 +71,16 @@ public final class MigrationJobPreparer {
             PipelineJobCenter.stop(jobItemContext.getJobId());
         }
         // TODO check metadata
-        try {
-            if 
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
 {
-                initIncrementalTasks(jobItemContext);
-                if (jobItemContext.isStopping()) {
-                    PipelineJobCenter.stop(jobItemContext.getJobId());
-                }
+        if 
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
 {
+            initIncrementalTasks(jobItemContext);
+            if (jobItemContext.isStopping()) {
+                PipelineJobCenter.stop(jobItemContext.getJobId());
             }
-            initInventoryTasks(jobItemContext);
-            log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",
-                    jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), 
jobItemContext.getIncrementalTasks());
-        } catch (final SQLException ex) {
-            log.error("job preparing failed, jobId={}", 
jobItemContext.getJobId());
-            throw new PipelineJobPrepareFailedException("job preparing failed, 
jobId=" + jobItemContext.getJobId(), ex);
         }
+        initInventoryTasks(jobItemContext);
+        log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",
+                jobItemContext.getJobId(), jobItemContext.getShardingItem(), 
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+        
     }
     
     private void prepareAndCheckTargetWithLock(final MigrationJobItemContext 
jobItemContext) throws SQLException {
@@ -154,13 +150,17 @@ public final class MigrationJobPreparer {
         
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
     }
     
-    private void initIncrementalTasks(final MigrationJobItemContext 
jobItemContext) throws SQLException {
+    private void initIncrementalTasks(final MigrationJobItemContext 
jobItemContext) {
         PipelineChannelCreator pipelineChannelCreator = 
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
         ExecuteEngine incrementalDumperExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
-        
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
 taskConfig.getDumperConfig(), dataSourceManager));
+        try {
+            
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
 taskConfig.getDumperConfig(), dataSourceManager));
+        } catch (final SQLException ex) {
+            throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
+        }
         PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobItemContext.getSourceMetaDataLoader();
         IncrementalTask incrementalTask = new 
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
                 taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), 
pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, 
incrementalDumperExecuteEngine, jobItemContext);

Reply via email to