This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 170d49f3363 Add PrepareJobWithGetBinlogPositionException (#21040)
170d49f3363 is described below
commit 170d49f33635001dd46465ca9d7fe8f4e899caeb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 18 00:57:35 2022 +0800
Add PrepareJobWithGetBinlogPositionException (#21040)
---
.../user-manual/error-code/sql-error-code.cn.md | 5 ++--
.../user-manual/error-code/sql-error-code.en.md | 5 ++--
.../BinlogSyncChannelAlreadyClosedException.java | 2 +-
.../job/PipelineJobExecutionException.java | 2 +-
... PrepareJobWithGetBinlogPositionException.java} | 12 +++++----
.../scenario/migration/MigrationJobPreparer.java | 30 +++++++++++-----------
6 files changed, 30 insertions(+), 26 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 696a8d68f2f..800e627a87e 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -103,8 +103,9 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| 08000 | 18088 | Check privileges failed on source data source,
reason is: %s |
| 08000 | 18089 | Data sources can not connect, reason is: %s |
| HY000 | 18090 | Importer job write data failed |
-| HY000 | 18091 | Can not poll event because of binlog sync channel
already closed |
-| HY000 | 18092 | Task \`%s\` execute failed |
+| 08000 | 18091 | Get binlog position failed by job \`%s\`, reason
is: %s |
+| HY000 | 18093 | Can not poll event because of binlog sync channel
already closed |
+| HY000 | 18094 | Task \`%s\` execute failed |
## 功能异常
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 34e7a05e089..836ac81e0ab 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -103,8 +103,9 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| 08000 | 18088 | Check privileges failed on source data source,
reason is: %s |
| 08000 | 18089 | Data sources can not connect, reason is: %s |
| HY000 | 18090 | Importer job write data failed |
-| HY000 | 18091 | Can not poll event because of binlog sync channel
already closed |
-| HY000 | 18092 | Task \`%s\` execute failed |
+| 08000 | 18091 | Get binlog position failed by job \`%s\`, reason
is: %s |
+| HY000 | 18093 | Can not poll event because of binlog sync channel
already closed |
+| HY000 | 18094 | Task \`%s\` execute failed |
## Feature Exception
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
index 284256a42e6..119eb138436 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
@@ -28,6 +28,6 @@ public final class BinlogSyncChannelAlreadyClosedException
extends PipelineSQLEx
private static final long serialVersionUID = -8897293295641185703L;
public BinlogSyncChannelAlreadyClosedException() {
- super(XOpenSQLState.GENERAL_ERROR, 91, "Can not poll event because of
binlog sync channel already closed");
+ super(XOpenSQLState.GENERAL_ERROR, 93, "Can not poll event because of
binlog sync channel already closed");
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
index 13d2df77bb5..a73e812ac52 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
@@ -28,6 +28,6 @@ public final class PipelineJobExecutionException extends
PipelineSQLException {
private static final long serialVersionUID = -5530453461378051166L;
public PipelineJobExecutionException(final String taskId, final Throwable
cause) {
- super(XOpenSQLState.GENERAL_ERROR, 92, "Task `%s` execute failed",
taskId, cause.getMessage());
+ super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed",
taskId, cause.getMessage());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
similarity index 69%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
index 284256a42e6..7eb4bcb73f7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
@@ -20,14 +20,16 @@ package
org.apache.shardingsphere.data.pipeline.core.exception.job;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+import java.sql.SQLException;
+
/**
- * Binlog sync channel already closed exception.
+ * Prepare job with get binlog position.
*/
-public final class BinlogSyncChannelAlreadyClosedException extends
PipelineSQLException {
+public final class PrepareJobWithGetBinlogPositionException extends
PipelineSQLException {
- private static final long serialVersionUID = -8897293295641185703L;
+ private static final long serialVersionUID = -3701189611685636704L;
- public BinlogSyncChannelAlreadyClosedException() {
- super(XOpenSQLState.GENERAL_ERROR, 91, "Can not poll event because of
binlog sync channel already closed");
+ public PrepareJobWithGetBinlogPositionException(final String jobId, final
SQLException cause) {
+ super(XOpenSQLState.CONNECTION_EXCEPTION, 90, "Get binlog position
failed by job `%s`, reason is: %s", jobId, cause.getMessage());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 48071fee418..93776d3ae2e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -29,7 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
@@ -71,20 +71,16 @@ public final class MigrationJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
}
// TODO check metadata
- try {
- if
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
{
- initIncrementalTasks(jobItemContext);
- if (jobItemContext.isStopping()) {
- PipelineJobCenter.stop(jobItemContext.getJobId());
- }
+ if
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
{
+ initIncrementalTasks(jobItemContext);
+ if (jobItemContext.isStopping()) {
+ PipelineJobCenter.stop(jobItemContext.getJobId());
}
- initInventoryTasks(jobItemContext);
- log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
- jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(),
jobItemContext.getIncrementalTasks());
- } catch (final SQLException ex) {
- log.error("job preparing failed, jobId={}",
jobItemContext.getJobId());
- throw new PipelineJobPrepareFailedException("job preparing failed,
jobId=" + jobItemContext.getJobId(), ex);
}
+ initInventoryTasks(jobItemContext);
+ log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
+ jobItemContext.getJobId(), jobItemContext.getShardingItem(),
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+
}
private void prepareAndCheckTargetWithLock(final MigrationJobItemContext
jobItemContext) throws SQLException {
@@ -154,13 +150,17 @@ public final class MigrationJobPreparer {
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
}
- private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) throws SQLException {
+ private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) {
PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
ExecuteEngine incrementalDumperExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
-
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperConfig(), dataSourceManager));
+ try {
+
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperConfig(), dataSourceManager));
+ } catch (final SQLException ex) {
+ throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
+ }
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
IncrementalTask incrementalTask = new
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader,
incrementalDumperExecuteEngine, jobItemContext);