massakam commented on a change in pull request #200: URL: https://github.com/apache/pulsar-client-node/pull/200#discussion_r825675677
########## File path: src/Consumer.cc ########## @@ -243,160 +235,229 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker { private: Napi::Promise::Deferred deferred; - pulsar_consumer_t *cConsumer; - pulsar_message_t *cMessage; + std::shared_ptr<pulsar_consumer_t> cConsumer; + std::shared_ptr<pulsar_message_t> cMessage; int64_t timeout; }; Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); if (info[0].IsUndefined()) { - ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer); wk->Queue(); } else { Napi::Number timeout = info[0].As<Napi::Object>().ToNumber(); - ConsumerReceiveWorker *wk = - new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, timeout.Int64Value()); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value()); wk->Queue(); } return deferred.Promise(); } -void Consumer::Acknowledge(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - Message *msg = Message::Unwrap(obj); - pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL); +Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto msg = Message::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_async( + this->cConsumer.get(), msg->GetCMessage().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to consumer acknowledge: ") + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } -void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, NULL); +Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto *msgId = MessageId::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_async_id( + this->cConsumer.get(), msgId->GetCMessageId().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to consumer acknowledge id: ") + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As<Napi::Object>(); Message *msg = Message::Unwrap(obj); - pulsar_consumer_negative_acknowledge(this->wrapper->cConsumer, msg->GetCMessage()); + std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage(); + pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get()); } void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As<Napi::Object>(); MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_negative_acknowledge_id(this->wrapper->cConsumer, msgId->GetCMessageId()); + std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId(); + pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), cMessageId.get()); } -void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - Message *msg = Message::Unwrap(obj); - pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL); +Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto *msg = Message::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_cumulative_async( + this->cConsumer.get(), msg->GetCMessage().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to consumer acknowledge cumulative: ") + + pulsar_result_str(result)); + } else { + deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null())); + } + + self->Unref(); + }, + ctx); + + return deferred->Promise(); } -void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, - NULL); +Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) { + auto obj = info[0].As<Napi::Object>(); + auto *msgId = MessageId::Unwrap(obj); + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext<Consumer *>(this, deferred); + this->Ref(); + + pulsar_consumer_acknowledge_cumulative_async_id( + this->cConsumer.get(), msgId->GetCMessageId().get(), + [](pulsar_result result, void *ctx) { + auto deferredContext = static_cast<ExtDeferredContext<Consumer *> *>(ctx); + auto deferred = deferredContext->deferred; + auto self = deferredContext->ref; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to consumer acknowledge cumulative id: ") + Review comment: ```suggestion deferred->Reject(std::string("Consumer failed to acknowledge cumulatively by id: ") + ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org