sandynz commented on code in PR #21065:
URL: https://github.com/apache/shardingsphere/pull/21065#discussion_r974002420
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java:
##########
@@ -41,4 +41,6 @@ public final class InventoryIncrementalJobItemProgress
implements PipelineJobIte
private JobItemIncrementalTasksProgress incremental;
private long processedRecordsCount;
+
+ private transient String errorMsg;
Review Comment:
All `errorMsg` could be `errorMessage`
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java:
##########
@@ -83,4 +83,22 @@ public interface PipelineJobAPI extends
PipelineJobPublicAPI, PipelineJobItemAPI
* @return job configuration
*/
PipelineJobConfiguration getJobConfiguration(String jobId);
+
+ /**
+ * Get job item error msg.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return map, key is sharding item, value is error msg
+ */
+ String getJobItemErrorMsg(String jobId, int shardingItem);
+
+ /**
+ * Persist job item error msg.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param errorMsg error msg
+ */
+ void persistJobItemErrorMsg(String jobId, int shardingItem, Throwable
errorMsg);
Review Comment:
The 3rd parameter type could be Object (might be Throwable or String)
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -492,4 +496,19 @@ private YamlPipelineDataSourceConfiguration
createYamlPipelineDataSourceConfigur
result.setParameter(parameter);
return result;
}
+
+ @Override
+ public String getJobItemErrorMsg(final String jobId, final int
shardingItem) {
+ return
ObjectUtils.defaultIfNull(PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemErrorMsg(jobId,
shardingItem), "");
+ }
+
+ @Override
+ public void persistJobItemErrorMsg(final String jobId, final int
shardingItem, final Throwable throwable) {
+ String key = PipelineMetaDataNode.getJobItemErrorMsgPath(jobId,
shardingItem);
+ String value = "";
+ if (null != throwable) {
+ value = ExceptionUtils.getStackTrace(throwable);
+ }
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persist(key, value);
+ }
Review Comment:
It could be put to `AbstractPipelineJobAPIImpl` for common usage
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -43,6 +45,8 @@ public final class InventoryIncrementalTasksRunner implements
PipelineTasksRunne
private final PipelineJobItemAPI jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
+ private final MigrationJobAPI jobAPI =
MigrationJobAPIFactory.getInstance();
+
Review Comment:
InventoryIncrementalTasksRunner is for common usage, MigrationJobAPI could
not be used here
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -151,15 +157,17 @@ private synchronized void executeIncrementalTask() {
private ExecuteCallback createIncrementalTaskCallback() {
return new ExecuteCallback() {
-
+
@Override
public void onSuccess() {
+ jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), null);
}
Review Comment:
Could we just delete the node?
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -151,15 +157,17 @@ private synchronized void executeIncrementalTask() {
private ExecuteCallback createIncrementalTaskCallback() {
return new ExecuteCallback() {
-
+
@Override
public void onSuccess() {
+ jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), null);
}
@Override
public void onFailure(final Throwable throwable) {
log.error("Incremental task execute failed.", throwable);
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+ jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), throwable);
stop();
}
Review Comment:
1, Is error message persisted when IncrementalTask failed?
2, Is error message persisted when MySQLClient reconnect failed?
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -118,12 +122,14 @@ public void onSuccess() {
log.info("onSuccess, all inventory tasks finished.");
executeIncrementalTask();
}
+ jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), null);
}
Review Comment:
1, `executeIncrementalTask` is not blocking, do we need to clean error
message when incremental task is still running?
2, Is error message cleaned when job restart?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]