lizhimins commented on code in PR #992:
URL: https://github.com/apache/rocketmq-clients/pull/992#discussion_r2097772707


##########
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java:
##########
@@ -188,18 +195,52 @@ protected void startUp() throws Exception {
         }
     }
 
+    /**
+     * PushConsumerImpl shutdown order
+     * 1. when begin shutdown, do not send any new receive request
+     * 2. cancel scanAssignmentsFuture, do not create new processQueue
+     * 3. waiting all inflight receive request finished or timeout
+     * 4. shutdown consumptionExecutor and waiting all message consumption 
finished
+     * 5. sleep 1s to ack message async
+     * 6. shutdown clientImpl
+     */
     @Override
     protected void shutDown() throws InterruptedException {
         log.info("Begin to shutdown the rocketmq push consumer, clientId={}", 
clientId);
         if (null != scanAssignmentsFuture) {
             scanAssignmentsFuture.cancel(false);
         }
-        super.shutDown();
+        log.info("Waiting for the inflight receive requests to be finished, 
clientId={}", clientId);
+        waitingReceiveRequestFinished();
+        log.info("Begin to Shutdown consumption executor, clientId={}", 
clientId);
         this.consumptionExecutor.shutdown();
         ExecutorServices.awaitTerminated(consumptionExecutor);
+        TimeUnit.SECONDS.sleep(1);
+        super.shutDown();
         log.info("Shutdown the rocketmq push consumer successfully, 
clientId={}", clientId);
     }
 
+    private void waitingReceiveRequestFinished() {
+        Duration maxWaitingTime = clientConfiguration.getRequestTimeout()
+            .plus(pushSubscriptionSettings.getLongPollingTimeout());
+        try {
+            CompletableFuture.runAsync(() -> {
+                while 
(inflightRequestCountInterceptor.getInflightReceiveRequestCount() > 0) {
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }).get(maxWaitingTime.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {

Review Comment:
       try {
           while 
(inflightRequestCountInterceptor.getInflightReceiveRequestCount() > 0) {
               if (System.currentTimeMillis() > endTime) {
                   log.warn("Timeout waiting for the inflight receive requests 
to be finished, clientId={}", clientId);
                   return;
               }
               Thread.sleep(100);  // 休眠100毫秒
           }
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           log.error("Interrupted while waiting for the inflight receive 
requests to be finished, clientId={}", clientId, e);
       } catch (Exception e) {
           log.error("Unexpected exception while waiting for the inflight 
receive requests to be finished, clientId={}", clientId, e);
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to