dragonls opened a new issue, #24697:
URL: https://github.com/apache/pulsar/issues/24697

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   Broker version: 3.0.7
   
   ### Issue Description
   
   We observed a case where a Pulsar consumer suddenly stops consuming messages 
(throughput drops to 0) while the system remains stable.
   
   <img width="1872" height="537" alt="Image" 
src="https://github.com/user-attachments/assets/d641c64c-4023-49f6-b05b-49edf311ef76";
 />
   
   I have caught a memory dump of broker and seems find a state inconsistency 
between `PersistentDispatcherSingleActiveConsumer` and `NonDurableCursorImpl`:
   1. The dispatcher's `havePendingRead` flag remains `true` (indicating an 
ongoing read operation)
   2. However, the cursor shows:
       - `pendingReadOps = 0`
       - `waitingReadOp = null`
       - No visible read operations in progress
   
   
   ### Error messages
   
   ```text
   2025-09-01T16:20:33,644+0800 [broker-topic-workers-OrderedExecutor-5-0] 
ERROR org.apache.bookkeeper.common.util.SingleThreadExecutor - Error while 
running task: Object has been recycled already.
   java.lang.IllegalStateException: Object has been recycled already.
        at io.netty.util.Recycler$DefaultHandle.toAvailable(Recycler.java:284) 
~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
        at io.netty.util.Recycler$LocalPool.release(Recycler.java:337) 
~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
        at io.netty.util.Recycler$DefaultHandle.recycle(Recycler.java:257) 
~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer$ReadEntriesCtx.recycle(PersistentDispatcherSingleActiveConsumer.java:651)
 ~[org.apache.pulsar-pulsar-broker-3.0.7.gemini.1.jar:3.0.7.gemini.1]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:165)
 ~[org.apache.pulsar-pulsar-broker-3.0.7.gemini.1.jar:3.0.7.gemini.1]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:158)
 ~[org.apache.pulsar-pulsar-broker-3.0.7.gemini.1.jar:3.0.7.gemini.1]
        at 
org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137)
 ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.gemini.1.jar:4.16.6.gemini.1]
        at 
org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:113)
 ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.gemini.1.jar:4.16.6.gemini.1]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
        at java.lang.Thread.run(Thread.java:840) ~[?:?]
   ```
   
   ### Reproducing the issue
   
   Can not stably reproduce the issue yet, but quite often recently.
   
   ### Additional information
   
   The topic `stats` (`pulsar-admin topics stats`):
   ```json
   {
       "msgRateIn": 2865.830615952374,
       "msgThroughputIn": 1375341.9489821792,
       "msgRateOut": 17195.656915001044,
       "msgThroughputOut": 8252158.974261264,
       "bytesInCounter": 8025929146395,
       "msgInCounter": 16168225916,
       "bytesOutCounter": 71740195736552,
       "msgOutCounter": 141897883743,
       "averageMsgSize": 479.9104110781945,
       "msgChunkPublished": false,
       "storageSize": 106950077339,
       "backlogSize": 11346044423,
       "publishRateLimitedTimes": 0,
       "earliestMsgPublishTimeInBacklogs": 0,
       "offloadedStorageSize": 106395360285,
       "lastOffloadLedgerId": 210635025,
       "lastOffloadSuccessTimeStamp": 1756724140842,
       "lastOffloadFailureTimeStamp": 0,
       "ongoingTxnCount": 0,
       "abortedTxnCount": 0,
       "committedTxnCount": 0,
       "publishers": [...],
       "waitingPublishers": 0,
       "subscriptions": {
           ...,
           "xxx-reader-c45f5b8585": {
               "msgRateOut": 0.0,
               "msgThroughputOut": 0.0,
               "bytesOutCounter": 54528649459,
               "msgOutCounter": 104377143,
               "msgRateRedeliver": 0.0,
               "messageAckRate": 0.0,
               "chunkedMessageRate": 0,
               "msgBacklog": 2207000,
               "backlogSize": 11346039239,
               "earliestMsgPublishTimeInBacklog": 0,
               "msgBacklogNoDelayed": 2207000,
               "blockedSubscriptionOnUnackedMsgs": false,
               "msgDelayed": 0,
               "unackedMessages": 0,
               "type": "Exclusive",
               "activeConsumerName": "cdd1e",
               "msgRateExpired": 0.0,
               "totalMsgExpired": 0,
               "lastExpireTimestamp": 1756724399200,
               "lastConsumedFlowTimestamp": 1756714832707,
               "lastConsumedTimestamp": 1756714833634,
               "lastAckedTimestamp": 1756714833690,
               "lastMarkDeleteAdvancedTimestamp": 1756714833690,
               "consumers": [{
                       "msgRateOut": 0.0,
                       "msgThroughputOut": 0.0,
                       "bytesOutCounter": 54528649459,
                       "msgOutCounter": 104377143,
                       "msgRateRedeliver": 0.0,
                       "messageAckRate": 0.0,
                       "chunkedMessageRate": 0.0,
                       "consumerName": "cdd1e",
                       "availablePermits": 7857,
                       "unackedMessages": 0,
                       "avgMessagesPerEntry": 11,
                       "blockedConsumerOnUnackedMsgs": false,
                       "lastAckedTimestamp": 1756714833690,
                       "lastConsumedTimestamp": 1756714833634,
                       "lastConsumedFlowTimestamp": 1756714832707,
                       "metadata": {},
                       "address": "/xxx:54340",
                       "connectedSince": "2025-09-01T00:00:13.09212201+08:00",
                       "clientVersion": "Pulsar-Java-v3.0.13",
                       "lastAckedTime": "2025-09-01T16:20:33.69+08:00",
                       "lastConsumedTime": "2025-09-01T16:20:33.634+08:00"
                   }
               ],
               "isDurable": false,
               "isReplicated": false,
               "allowOutOfOrderDelivery": false,
               "consumersAfterMarkDeletePosition": {},
               "nonContiguousDeletedMessagesRanges": 0,
               "nonContiguousDeletedMessagesRangesSerializedSize": 0,
               "delayedMessageIndexSizeInBytes": 0,
               "subscriptionProperties": {},
               "filterProcessedMsgCount": 0,
               "filterAcceptedMsgCount": 0,
               "filterRejectedMsgCount": 0,
               "filterRescheduledMsgCount": 0,
               "durable": false,
               "replicated": false
           },
           ...
       },
       "replication": {},
       "deduplicationStatus": "Disabled",
       "nonContiguousDeletedMessagesRanges": 256,
       "nonContiguousDeletedMessagesRangesSerializedSize": 5368,
       "delayedMessageIndexSizeInBytes": 0,
       "compaction": {
           "lastCompactionRemovedEventCount": 0,
           "lastCompactionSucceedTimestamp": 0,
           "lastCompactionFailedTimestamp": 0,
           "lastCompactionDurationTimeInMills": 0
       },
       "ownerBroker": "xxx:8080"
   }
   ```
   
   The topic `stats-internal`  (`pulsar-admin topics stats-internal`):
   ```json
   {
       "entriesAddedCounter": 1548744096,
       "numberOfEntries": 20398660,
       "totalSize": 106975197845,
       "currentLedgerEntries": 112446,
       "currentLedgerSize": 579840850,
       "lastLedgerCreatedTimestamp": "2025-09-01T18:54:52.871+08:00",
       "waitingCursorsCount": 0,
       "pendingAddEntriesCount": 1,
       "lastConfirmedEntry": "210637258:112444",
       "state": "LedgerOpened",
       "ledgers": [{
               "ledgerId": 210321221,
               "entries": 181771,
               "size": 937919906,
               "offloaded": true,
               "underReplicated": false,
               "bookkeeperDeleted": true
           }, ..., {
               "ledgerId": 210637258,
               "entries": 0,
               "size": 0,
               "offloaded": false,
               "underReplicated": false,
               "bookkeeperDeleted": false
           }
       ],
       "cursors": {
           ...,
           "xxx-reader-c45f5b8585": {
               "markDeletePosition": "210600931:90642",
               "readPosition": "210600931:90643",
               "waitingReadOp": false,
               "pendingReadOps": 0,
               "messagesConsumedCounter": 1546532189,
               "cursorLedger": -1,
               "cursorLedgerLastEntry": -1,
               "individuallyDeletedMessages": "[]",
               "lastLedgerSwitchTimestamp": "2025-09-01T00:00:13.091+08:00",
               "state": "Open",
               "active": false,
               "numberOfEntriesSinceFirstNotAckedMessage": 1,
               "totalNonContiguousDeletedMessagesRange": 0,
               "subscriptionHavePendingRead": true,
               "subscriptionHavePendingReplayRead": false,
               "properties": {}
           },
           ...
       },
       "schemaLedgers": [],
       "compactedLedger": {
           "ledgerId": -1,
           "entries": -1,
           "size": -1,
           "offloaded": false,
           "underReplicated": false,
           "bookkeeperDeleted": false
       }
   }
   ```
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to