sandynz commented on code in PR #21065:
URL: https://github.com/apache/shardingsphere/pull/21065#discussion_r974002420


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java:
##########
@@ -41,4 +41,6 @@ public final class InventoryIncrementalJobItemProgress 
implements PipelineJobIte
     private JobItemIncrementalTasksProgress incremental;
     
     private long processedRecordsCount;
+    
+    private transient String errorMsg;

Review Comment:
   All `errorMsg` could be `errorMessage`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java:
##########
@@ -83,4 +83,22 @@ public interface PipelineJobAPI extends 
PipelineJobPublicAPI, PipelineJobItemAPI
      * @return job configuration
      */
     PipelineJobConfiguration getJobConfiguration(String jobId);
+    
+    /**
+     * Get job item error msg.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return map, key is sharding item, value is error msg
+     */
+    String getJobItemErrorMsg(String jobId, int shardingItem);
+    
+    /**
+     * Persist job item error msg.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param errorMsg error msg
+     */
+    void persistJobItemErrorMsg(String jobId, int shardingItem, Throwable 
errorMsg);

Review Comment:
   The 3rd parameter type could be Object (might be Throwable or String)



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -492,4 +496,19 @@ private YamlPipelineDataSourceConfiguration 
createYamlPipelineDataSourceConfigur
         result.setParameter(parameter);
         return result;
     }
+    
+    @Override
+    public String getJobItemErrorMsg(final String jobId, final int 
shardingItem) {
+        return 
ObjectUtils.defaultIfNull(PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemErrorMsg(jobId,
 shardingItem), "");
+    }
+    
+    @Override
+    public void persistJobItemErrorMsg(final String jobId, final int 
shardingItem, final Throwable throwable) {
+        String key = PipelineMetaDataNode.getJobItemErrorMsgPath(jobId, 
shardingItem);
+        String value = "";
+        if (null != throwable) {
+            value = ExceptionUtils.getStackTrace(throwable);
+        }
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persist(key, value);
+    }

Review Comment:
   It could be put to `AbstractPipelineJobAPIImpl` for common usage



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -43,6 +45,8 @@ public final class InventoryIncrementalTasksRunner implements 
PipelineTasksRunne
     
     private final PipelineJobItemAPI jobItemAPI = new 
InventoryIncrementalJobItemAPIImpl();
     
+    private final MigrationJobAPI jobAPI = 
MigrationJobAPIFactory.getInstance();
+    

Review Comment:
   InventoryIncrementalTasksRunner is for common usage, MigrationJobAPI could 
not be used here



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -151,15 +157,17 @@ private synchronized void executeIncrementalTask() {
     
     private ExecuteCallback createIncrementalTaskCallback() {
         return new ExecuteCallback() {
-            
+    
             @Override
             public void onSuccess() {
+                jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), null);
             }

Review Comment:
   Could we just delete the node?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -151,15 +157,17 @@ private synchronized void executeIncrementalTask() {
     
     private ExecuteCallback createIncrementalTaskCallback() {
         return new ExecuteCallback() {
-            
+    
             @Override
             public void onSuccess() {
+                jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), null);
             }
             
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("Incremental task execute failed.", throwable);
                 
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+                jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), throwable);
                 stop();
             }

Review Comment:
   1, Is error message persisted when IncrementalTask failed?
   
   2, Is error message persisted when MySQLClient reconnect failed?
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java:
##########
@@ -118,12 +122,14 @@ public void onSuccess() {
                     log.info("onSuccess, all inventory tasks finished.");
                     executeIncrementalTask();
                 }
+                jobAPI.persistJobItemErrorMsg(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), null);
             }

Review Comment:
   1, `executeIncrementalTask` is not blocking, do we need to clean error 
message when incremental task is still running?
   
   2, Is error message cleaned when job restart?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to