lipeining opened a new issue #181:
URL: https://github.com/apache/pulsar-client-node/issues/181


   os: Ubuntu 16.04.7 LTS
   nodejs: v14.17.0
   pulsar-client: 1.3.0
   pulsar c++ client: 2.7.1
   framework: midway.js
   
   on our web server written with nodejs, when the server start, it will use 
pulsar-client(we clone a copy of branch-1.3) connect to our pulsar service and 
subscribe from some topics. but, with some bug, the server crash down after 
subscribe on a pulsar topic.would you please take sometime to find the crash 
reason, thanks a lot.
   
   our web server subscribe on five or six topics, also, it had been work well 
done for a  long time. but, without change a line of js code, it crash down.
   we spend some time to track what's happening.
   when the node.js process crash down, we got the crash process core dump file,
   here we got some log about why pulsar-client crash down.
   ### gcore
   `gcore -c corefileName`
   with help of gcore  and core dump file, we get some crash stack like this
   ```txt
   thread #1: tid = 40071, 0x00007fffdfcd4af9 
Pulsar.node`MessageListenerProxy(Napi::Env, Napi::Function, 
MessageListenerProxyData*) + 73, name = 'node', stop reason = signal SIGSEGV: 
invalid address (fault address: 0x10)
       frame #0: 0x00007fffdfcd4af9 Pulsar.node`MessageListenerProxy(Napi::Env, 
Napi::Function, MessageListenerProxyData*) + 73
     * frame #1: 0x00007fffdfcd5a37 Pulsar.node`std::_Function_handler<void 
(Napi::Env, Napi::Function), napi_status 
Napi::ThreadSafeFunction::BlockingCall<MessageListenerProxyData, void 
(*)(Napi::Env, Napi::Function, 
MessageListenerProxyData*)>(MessageListenerProxyData*, void (*)(Napi::Env, 
Napi::Function, MessageListenerProxyData*)) const::'lambda'(Napi::Env, 
Napi::Function)>::_M_invoke(std::_Any_data const&, Napi::Env&&, 
Napi::Function&&) + 23
       frame #2: 0x00007fffdfcbfbf2 
Pulsar.node`Napi::ThreadSafeFunction::CallJS(napi_env__*, napi_value__*, void*, 
void*) + 130
       frame #3: 0x00000000009f3ab8 node`v8impl::(anonymous 
namespace)::ThreadSafeFunction::IdleCb(uv_idle_s*) + 328
       frame #4: node`uv__run_idle(loop=0x00000000044b0800) at loop-watcher.c:68
       frame #5: node`uv_run(loop=0x00000000044b0800, mode=UV_RUN_DEFAULT) at 
core.c:378
       frame #6: 0x0000000000a625f4 node`node::NodeMainInstance::Run() + 580
       frame #7: 0x00000000009f0ba5 node`node::Start(int, char**) + 277
       frame #8: libc.so.6`__libc_start_main(main=(node`main), argc=3, 
argv=0x00007fffffff6778, init=<unavailable>, fini=<unavailable>, 
rtld_fini=<unavailable>, stack_end=0x00007fffffff6768) at libc-start.c:291
       frame #9: 0x000000000098080c node`_start + 41
   ```
   beacuse we copy a source code of [branch-1.3],so we add some log to find out 
why it would crash on function **MessageListenerProxy**, we add some log on 
this function.
   ```c++
   void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, 
MessageListenerProxyData *data) {
     printf("-->%s:%d:%p\n", __func__, __LINE__, data);
     printf("-->%s:%d:%p\n", __func__, __LINE__, data->cMessage);
     printf("-->%s:%d:%p\n", __func__, __LINE__, data->consumer);
   
     if (data->consumer == NULL) {
       printf("--consumer is nil-->%s:%d:%p\n", __func__, __LINE__, 
data->consumer);
       return;
     }
   
     Napi::Object msg = Message::NewInstance({}, data->cMessage);
     Consumer *consumer = data->consumer;
     delete data;
   
     jsCallback.Call({msg, consumer->Value()});
   }
   ```
   just find out the data->consumer is a nullptry, so the consumer->Value() 
would crash down with log
   invalid address (fault address: 0x10)
   
![wecom-temp-bf2d896e05feb68775cc875bc9bd5857](https://user-images.githubusercontent.com/39306888/143686471-742458a1-4f81-4f20-9805-1860eac6beea.png)
   
   ### possible crash reason
   after analyse pulsar-client source code, we find out the data->consumer is 
init inside
   ConsumerNewInstanceWorker::OnOK----->>>>>Consumer::SetListenerCallback
   
https://github.com/apache/pulsar-client-node/blob/branch-1.3/src/Consumer.cc#L89
   the MessageListener.prams.ctx is a ListenerCallback,
   the ctx is also alias ConsumerConfig->listener, which will pass to a 
Consumer->listener,
   and more, Consumer::SetListenerCallback will make sure 
ListenerCallback->consumer will be properly inited.
   
   so far by now,we can assume, why data->consumer is nullptry? because it had 
not been init.
   ConsumerNewInstanceWorker init the data->consumer on another thread, and 
MessageListener use data->consumer in main thread.
   is it possible the data->consumer is not yet init before the MessageListener 
use it?
   because this two thread can not make sure the init order.
   
   ### here is the Code
   ```js
   const Pulsar = require('@yy/sl-pulsar-client');
   
   const subscription = 'test-sub-fork5';
   
   export default agent => {
     console.log('========== plugin agent ready ================');
     const clients = {};
   
     // just use fake [pulsarUrl, topicName, funcName]
     const Clients = {
       clientNameTest: {
         ClientConfig: {
             serviceUrl: 'pulsarUrl',
         },
         ConsumerConfigs: [
             {
                 topic: 'topicName',
                 HandleMethod: 'funcName',
             },
         ],
       },
     };
   
     async function connect() {
       for (const clientName of Object.keys(Clients)) {
         const clientConfig = Clients[clientName];
         const client = new Pulsar.Client(clientConfig.ClientConfig);
         clients[clientName] = client;
   
         await connectClient(clientName, client);
       }
     }
   
     async function connectClient(clientName, client) {
       const defaultConfig = {
         subscriptionType: 'Shared',
         subscription,
       };
       const clientConfig = Clients[clientName];
   
       for (const config of clientConfig.ConsumerConfigs) {
         const consumerConfig = { ...defaultConfig, ...config };
         const { topic, subscription, HandleMethod } = consumerConfig;
   
         try {
           await client.subscribe({
             ...consumerConfig,
             listener: (msg, res) => {
               const msgData = msg.getData().toString();
               const payload = parseMessagePayload({}, msgData);
   
               // fs.appendFileSync('./msg', JSON.stringify({ topic, msgData 
}));
   
               console.log(
                 '[egg-pulsar] 接受到消息 topic: %s, subscription: %s, handleMethod: 
%s, msg: %s',
                 topic,
                 subscription,
                 HandleMethod,
                 msgData,
                 payload
               );
   
               res.acknowledge(msg);
             },
           });
   
           console.log(
             '[egg-pulsar] consumer 订阅成功 topic: %s, subscription: %s',
             topic,
             subscription
           );
         } catch (e) {
           console.log('e==>', e);
         }
       }
     }
   
     connect();
   
     agent.messenger.on('egg-ready', () => {
       console.log('plugin egg-ready!!!!!!!!!');
     });
   };
   
   function parseMessagePayload(agent, payload) {
     try {
       // 校验消息格式
       return JSON.parse(payload);
     } catch (e) {
       return console.log('[egg-pulsar] payload 消息格式非法', payload.toString());
     }
   }
   ```
   ### Pulsar Log
   ```txt
   2021-11-27 21:35:59.044 INFO  [139917541496576] Client:88 | Subscribing on 
Topic :persistent:fakeTopicName
   2021-11-27 21:35:59.045 INFO  [139917541496576] ConnectionPool:85 | Created 
connection for pulsar://fakePulsarUrl
   2021-11-27 21:35:59.045 INFO  [139917502453504] ClientConnection:356 | 
[10.86.46.192:40820 -> 10.86.5.121:8501] Connected to broker
   // /..
   // just crash down
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to