lizhimins commented on code in PR #992: URL: https://github.com/apache/rocketmq-clients/pull/992#discussion_r2097772707
########## java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java: ########## @@ -188,18 +195,52 @@ protected void startUp() throws Exception { } } + /** + * PushConsumerImpl shutdown order + * 1. when begin shutdown, do not send any new receive request + * 2. cancel scanAssignmentsFuture, do not create new processQueue + * 3. waiting all inflight receive request finished or timeout + * 4. shutdown consumptionExecutor and waiting all message consumption finished + * 5. sleep 1s to ack message async + * 6. shutdown clientImpl + */ @Override protected void shutDown() throws InterruptedException { log.info("Begin to shutdown the rocketmq push consumer, clientId={}", clientId); if (null != scanAssignmentsFuture) { scanAssignmentsFuture.cancel(false); } - super.shutDown(); + log.info("Waiting for the inflight receive requests to be finished, clientId={}", clientId); + waitingReceiveRequestFinished(); + log.info("Begin to Shutdown consumption executor, clientId={}", clientId); this.consumptionExecutor.shutdown(); ExecutorServices.awaitTerminated(consumptionExecutor); + TimeUnit.SECONDS.sleep(1); + super.shutDown(); log.info("Shutdown the rocketmq push consumer successfully, clientId={}", clientId); } + private void waitingReceiveRequestFinished() { + Duration maxWaitingTime = clientConfiguration.getRequestTimeout() + .plus(pushSubscriptionSettings.getLongPollingTimeout()); + try { + CompletableFuture.runAsync(() -> { + while (inflightRequestCountInterceptor.getInflightReceiveRequestCount() > 0) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }).get(maxWaitingTime.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { Review Comment: try { while (inflightRequestCountInterceptor.getInflightReceiveRequestCount() > 0) { if (System.currentTimeMillis() > endTime) { log.warn("Timeout waiting for the inflight receive requests to be finished, clientId={}", clientId); return; } Thread.sleep(100); // 休眠100毫秒 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("Interrupted while waiting for the inflight receive requests to be finished, clientId={}", clientId, e); } catch (Exception e) { log.error("Unexpected exception while waiting for the inflight receive requests to be finished, clientId={}", clientId, e); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org