Matt-Esch commented on code in PR #200: URL: https://github.com/apache/pulsar-client-node/pull/200#discussion_r965248019
########## src/Consumer.cc: ########## @@ -243,160 +235,225 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker { private: Napi::Promise::Deferred deferred; - pulsar_consumer_t *cConsumer; - pulsar_message_t *cMessage; + std::shared_ptr<pulsar_consumer_t> cConsumer; + std::shared_ptr<pulsar_message_t> cMessage; int64_t timeout; }; Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); if (info[0].IsUndefined()) { - ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer); wk->Queue(); } else { Napi::Number timeout = info[0].As<Napi::Object>().ToNumber(); - ConsumerReceiveWorker *wk = - new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, timeout.Int64Value()); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value()); wk->Queue(); } return deferred.Promise(); } -void Consumer::Acknowledge(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - Message *msg = Message::Unwrap(obj); - pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL); +Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto msg = Message::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_async( + this->cConsumer.get(), msg->GetCMessage().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to acknowledge: ") + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } -void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, NULL); +Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto *msgId = MessageId::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_async_id( + this->cConsumer.get(), msgId->GetCMessageId().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to acknowledge id: ") + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As<Napi::Object>(); Message *msg = Message::Unwrap(obj); - pulsar_consumer_negative_acknowledge(this->wrapper->cConsumer, msg->GetCMessage()); + std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage(); + pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get()); } void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As<Napi::Object>(); MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_negative_acknowledge_id(this->wrapper->cConsumer, msgId->GetCMessageId()); + std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId(); + pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), cMessageId.get()); } -void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - Message *msg = Message::Unwrap(obj); - pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL); +Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto *msg = Message::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_cumulative_async( + this->cConsumer.get(), msg->GetCMessage().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to acknowledge cumulatively: ") + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } -void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, - NULL); +Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto *msgId = MessageId::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_cumulative_async_id( + this->cConsumer.get(), msgId->GetCMessageId().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to acknowledge cumulatively by id: ") + + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); - return Napi::Boolean::New(env, pulsar_consumer_is_connected(this->wrapper->cConsumer)); + return Napi::Boolean::New(env, pulsar_consumer_is_connected(this->cConsumer.get())); } -class ConsumerCloseWorker : public Napi::AsyncWorker { - public: - ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer, - Consumer *consumer) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cConsumer(cConsumer), - consumer(consumer) {} - - ~ConsumerCloseWorker() {} - void Execute() { - pulsar_consumer_pause_message_listener(this->cConsumer); - pulsar_result result = pulsar_consumer_close(this->cConsumer); - if (result != pulsar_result_Ok) { - SetError(pulsar_result_str(result)); - } - } - void OnOK() { - this->consumer->Cleanup(); - this->deferred.Resolve(Env().Null()); - } - void OnError(const Napi::Error &e) { - this->deferred.Reject( - Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_consumer_t *cConsumer; - Consumer *consumer; -}; - -class ConsumerUnsubscribeWorker : public Napi::AsyncWorker { - public: - ConsumerUnsubscribeWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer, - Consumer *consumer) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cConsumer(cConsumer), - consumer(consumer) {} - - ~ConsumerUnsubscribeWorker() {} - void Execute() { - pulsar_consumer_pause_message_listener(this->cConsumer); - pulsar_result result = pulsar_consumer_unsubscribe(this->cConsumer); - if (result != pulsar_result_Ok) { - SetError(pulsar_result_str(result)); - } - } - void OnOK() { - this->consumer->Cleanup(); - this->deferred.Resolve(Env().Null()); - } - void OnError(const Napi::Error &e) { - this->deferred.Reject( - Napi::Error::New(Env(), std::string("Failed to unsubscribe consumer: ") + e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_consumer_t *cConsumer; - Consumer *consumer; -}; - void Consumer::Cleanup() { - if (this->listener) { - this->CleanupListener(); + if (this->listener != nullptr) { + pulsar_consumer_pause_message_listener(this->cConsumer.get()); + this->listener->callback.Release(); + this->listener = nullptr; + this->Unref(); } } -void Consumer::CleanupListener() { - pulsar_consumer_pause_message_listener(this->wrapper->cConsumer); - this->Unref(); - this->listener->callback.Release(); - this->listener = nullptr; -} - Napi::Value Consumer::Close(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer, this); - wk->Queue(); - return deferred.Promise(); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_pause_message_listener(this->cConsumer.get()); + pulsar_consumer_close_async( + this->cConsumer.get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to close consumer: ") + pulsar_result_str(result)); + } else { + self->Cleanup(); + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } Napi::Value Consumer::Unsubscribe(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ConsumerUnsubscribeWorker *wk = new ConsumerUnsubscribeWorker(deferred, this->wrapper->cConsumer, this); - wk->Queue(); - return deferred.Promise(); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_pause_message_listener(this->cConsumer.get()); + pulsar_consumer_unsubscribe_async( + this->cConsumer.get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to unsubscribe consumer: ") + pulsar_result_str(result)); + } else { + self->Cleanup(); + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } Consumer::~Consumer() { - if (this->listener) { - this->CleanupListener(); + this->Cleanup(); + while (this->Unref() != 0) { + // If Ref() > 0 then the process is shutting down. We must unref to prevent + // double free (once for the env shutdown and once for non-zero refs) } Review Comment: For future reference - the wrapped object should be held by an object reference if we want to keep it alive - see https://github.com/nodejs/node-addon-api/issues/1153#issuecomment-1118974995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org