dlecocq opened a new issue #132:
URL: https://github.com/apache/pulsar-client-node/issues/132


   We are unable to cleanly close our consumer when using `topicsPattern`:
   
   ```
   2020-10-27 11:15:43.304 INFO  [0x70000d6d5000] ConnectionPool:85 | Created 
connection for pulsar://pulsar.data-access-platform-portal.docker:6650
   2020-10-27 11:15:43.307 INFO  [0x70000ef61000] ClientConnection:343 | 
[192.168.99.1:58802 -> 192.168.99.100:6650] Connected to broker
   Closing consumer...
   (node:35996) UnhandledPromiseRejectionWarning: Error: Failed to close 
consumer: AlreadyClosed
   (node:35996) UnhandledPromiseRejectionWarning: Unhandled promise rejection. 
This error originated either by throwing inside of an async function without a 
catch block, or by rejecting a promise which was not handled with .catch(). 
(rejection id: 1)
   (node:35996) [DEP0018] DeprecationWarning: Unhandled promise rejections are 
deprecated. In the future, promise rejections that are not handled will 
terminate the Node.js process with a non-zero exit code.
   ```
   
   That's the result from running this code snippet (with the appropriate 
docker container running):
   
   ```
   const Bluebird = require('bluebird')
   const Pulsar = require('pulsar-client')
   
   const doit = async () => {
     const client = new Pulsar.Client({
       webServiceUrl: 'http://pulsar.data-access-platform-portal.docker:8080',
       serviceUrl: 'pulsar://pulsar.data-access-platform-portal.docker:6650'
     })
     const consumer = await client.subscribe({
       topicsPattern: `non-persistent://public/default/topic`,
       subscription: 'test-subscription',
       subscriptionType: 'Shared',
       subscriptionInitialPosition: 'Earliest'
     })
   
     // Rather than push messages to consume, we're just waiting for a second
     await Bluebird.delay(1000)
   
     console.log('Closing consumer...')
     // We've also tried omitting this line, but it still produces errors
     await consumer.close()
     console.log('Closing client...')
     await client.close()
   }
   
   doit()
   ```
   
   We've tried this with node versions 10.16.3, 12.16.0, and 14.11.0 and pulsar 
2.6.1.
   
   I _suspect_ that the root cause is that it _appears_ that the consumer 
implementation in this library makes the assumption that there's only a single 
consumer - 
https://github.com/apache/pulsar-client-node/blob/f010fb2ed3c530af8006ba3d3a7b73c6b46f6507/src/Consumer.cc#L178
 overwrites the wrapper's consumer with each invocation. While this wouldn't be 
a problem with a single topic, it seems that the last consumer "wins" in that 
it gets stitched up to be the consumer saved in the wrapper. However, there may 
be a race condition for whether the listener handler stitched up correctly from 
the config 
(https://github.com/apache/pulsar-client-node/blob/f010fb2ed3c530af8006ba3d3a7b73c6b46f6507/src/Consumer.cc#L184
 but then the consumer config is subsequently deleted).
   
   Looking at `ClientImpl` in the pulsar C++ code, it appears that it also 
takes responsibility for closing any open consumers and producers, so we tried 
also just relying on the client in order to close the consumers (since 
`ClientImpl` appears to correctly track all newly-created consumers), but that 
_also_ reproduces with `AlreadyClosed` getting thrown.
   
   All that said, it is a little tricky to track all of it because of the 
number of levels of indirection - from the JS to the C++ extension to the C 
bindings which delegate out to the C++ client code. So we might be way off 😆 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to