HuGuangcui opened a new issue, #8757:
URL: https://github.com/apache/seatunnel/issues/8757

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   
![Image](https://github.com/user-attachments/assets/59ed13df-8e98-4597-b655-c3618cd4f7c6)
   durable,exclusive,autoDelete三个字段未能正常读取到值,会导致连接时出现空指针异常
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   env {
     job.mode = "streaming"
   }
   source {
       RabbitMQ {
           host = 192.168.1.191
           port = 5672
           virtual_host = /
           username = guest
           password = guest
           queue_name = student_info
           schema = {
               fields {
                   id= "string"
                   name= "string"
                             age= "string"
                             grade= "string"
                             major= "string"
               }
           }
       }
   }
   sink {
         RabbitMQ {
              host = 192.168.1.191
                     port = 5672
                     virtual_host = /
                     username = guest
                     password = guest
                     queue_name = student_info_link
             rabbitmq.config = {
               requested-heartbeat = 10
               connection-timeout = 10
             }
         }
   }
   ```
   
   ### Running Command
   
   ```shell
   SeaTunnelEngineLocalExample示例跑的
   ```
   
   ### Error Exception
   
   ```log
   [944076218452410369] 2025-02-18 11:41:59,211 ERROR 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job 
fake_to_console.conf (944076218452410369), Pipeline: [(1/1)], task: [pipeline-1 
[Source[0]-RabbitMQ]-SourceTask (1/1)], taskGroupLocation: 
[TaskGroupLocation{jobId=944076218452410369, pipelineId=1, taskGroupId=30000}] 
end with state FAILED and Exception: 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException:
 ErrorCode:[RABBITMQ-02], ErrorDescription:[create rabbitmq client failed] - 
Error while create RMQ client with student_info at 192.168.1.191
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:69)
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.<init>(RabbitmqSourceReader.java:82)
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSource.createReader(RabbitmqSource.java:112)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.init(SourceFlowLifeCycle.java:124)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.init(SeaTunnelTask.java:135)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.init(SourceSeaTunnelTask.java:72)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:692)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at java.util.concurrent.FutureTask.<init>(FutureTask.java:151)
        at 
java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
        at 
org.apache.seatunnel.api.tracing.MDCExecutorService.submit(MDCExecutorService.java:78)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService.submitBlockingTask(TaskExecutionService.java:257)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService.deployLocalTask(TaskExecutionService.java:407)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:342)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.lambda$deployOnLocal$0(PhysicalVertex.java:282)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.deployInternal(PhysicalVertex.java:337)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.deployOnLocal(PhysicalVertex.java:276)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.deploy(PhysicalVertex.java:324)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:571)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:403)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:317)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$20(SubPlan.java:651)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:647)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:633)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:677)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228)
        at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at 
java.util.concurrent.CompletableFuture$UniAccept.<init>(CompletableFuture.java:641)
        at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:684)
        at 
java.util.concurrent.CompletableFuture.thenAcceptAsync(CompletableFuture.java:2019)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.addPhysicalVertexCallBack(SubPlan.java:198)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$initStateFuture$3(SubPlan.java:189)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.initStateFuture(SubPlan.java:187)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.addPipelineEndCallback(PhysicalPlan.java:144)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.prepareRestorePipeline(SubPlan.java:472)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:674)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228)
        at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at 
java.util.concurrent.CompletableFuture$UniAccept.<init>(CompletableFuture.java:641)
        at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:684)
        at 
java.util.concurrent.CompletableFuture.thenAcceptAsync(CompletableFuture.java:2019)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.addPhysicalVertexCallBack(SubPlan.java:198)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$initStateFuture$2(SubPlan.java:184)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.initStateFuture(SubPlan.java:182)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.addPipelineEndCallback(PhysicalPlan.java:144)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.initStateFuture(PhysicalPlan.java:139)
        at 
org.apache.seatunnel.engine.server.master.JobMaster.initStateFuture(JobMaster.java:346)
        at 
org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:288)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$6(CoordinatorService.java:649)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at java.util.concurrent.FutureTask.<init>(FutureTask.java:151)
        at 
java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
        at 
org.apache.seatunnel.api.tracing.MDCExecutorService.submit(MDCExecutorService.java:78)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.submitJob(CoordinatorService.java:633)
        at 
org.apache.seatunnel.engine.server.operation.SubmitJobOperation.doRun(SubmitJobOperation.java:69)
        at 
org.apache.seatunnel.engine.server.operation.AsyncOperation.run(AsyncOperation.java:52)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
        at 
com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
        at 
com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask.processInternal(AbstractInvocationMessageTask.java:38)
        at 
com.hazelcast.client.impl.protocol.task.AbstractAsyncMessageTask.processMessage(AbstractAsyncMessageTask.java:71)
        at 
com.hazelcast.client.impl.protocol.task.AbstractMessageTask.initializeAndProcessMessage(AbstractMessageTask.java:153)
        at 
com.hazelcast.client.impl.protocol.task.AbstractMessageTask.run(AbstractMessageTask.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: java.lang.NullPointerException
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.declareQueueDefaults(RabbitmqClient.java:199)
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.setupQueue(RabbitmqClient.java:192)
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:65)
        ... 112 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to