fuyou001 commented on code in PR #8916: URL: https://github.com/apache/rocketmq/pull/8916#discussion_r1841684301
########## proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java: ########## @@ -98,22 +99,44 @@ public long getOffset() { public static class HandleData { private final Semaphore semaphore = new Semaphore(1); + private final AtomicLong lastLockTimeMs = new AtomicLong(-1L); private volatile boolean needRemove = false; private volatile MessageReceiptHandle messageReceiptHandle; public HandleData(MessageReceiptHandle messageReceiptHandle) { this.messageReceiptHandle = messageReceiptHandle; } - public boolean lock(long timeoutMs) { + public Long lock(long timeoutMs) { try { - return this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + long currentTimeMs = System.currentTimeMillis(); + if (result) { + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } else { + // if the lock is expired, can be acquired again + long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3; + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + synchronized (this) { + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } + } + } + } + return null; } catch (InterruptedException e) { - return false; + return null; } } - public void unlock() { + public void unlock(long lockTimeMs) { + // if the lock is expired, we don't need to unlock it + if (System.currentTimeMillis() - lockTimeMs > ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 2) { Review Comment: add log ########## proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java: ########## @@ -98,22 +99,44 @@ public long getOffset() { public static class HandleData { private final Semaphore semaphore = new Semaphore(1); + private final AtomicLong lastLockTimeMs = new AtomicLong(-1L); private volatile boolean needRemove = false; private volatile MessageReceiptHandle messageReceiptHandle; public HandleData(MessageReceiptHandle messageReceiptHandle) { this.messageReceiptHandle = messageReceiptHandle; } - public boolean lock(long timeoutMs) { + public Long lock(long timeoutMs) { try { - return this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + long currentTimeMs = System.currentTimeMillis(); + if (result) { + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } else { + // if the lock is expired, can be acquired again + long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3; + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + synchronized (this) { Review Comment: add log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org