jobmission opened a new issue, #4820:
URL: https://github.com/apache/seatunnel/issues/4820

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   job failed after  submit to cluster
   
   ### SeaTunnel Version
   
   2.3.1
   
   ### SeaTunnel Config
   
   ```conf
   seatunnel:
     engine:
       backup-count: 1
       queue-type: blockingqueue
       print-execution-info-interval: 60
       print-job-metrics-info-interval: 60
       slot-service:
         dynamic-slot: true
       checkpoint:
         interval: 10000
         timeout: 60000
         max-concurrent: 5
         tolerable-failure: 2
         storage:
           type: hdfs
           max-retained: 3
           plugin-config:
             namespace: /tmp/seatunnel/checkpoint_snapshot
             storage.type: hdfs
             fs.defaultFS: file:///tmp/
   ```
   
   
   ### Running Command
   
   ```shell
   env {
       job.mode = "STREAMING"
   }
   
   source {
       RabbitMQ {
           host = "rabbitmq.local"
               port = 5672
               virtual_host = "/v2"
               username = "guest"
               password = "guest"
               automaticRecovery = "true"
               queue_name = "jian2"
               schema = {
               fields {
                   commandType = string
                   requestId = string
               }
           }
       }
   }
   
   transform {}
   
   
   
   sink {
       Redis {
         host = "redis.local"
         port = 6379
         key = commandType
         data_type = key
         auth="xxxxxxx"
       }
   }
   ```
   
   
   ### Error Exception
   
   ```log
   java.util.concurrent.CompletionException: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 
CheckpointCoordinator inside have error.
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:215)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:211)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:390)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
Source)
        at 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.internalComplete(PassiveCompletableFuture.java:70)
        at 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.lambda$new$0(PassiveCompletableFuture.java:33)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
Source)
        at 
org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint.acknowledgeTask(PendingCheckpoint.java:147)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:581)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:261)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
        at 
com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
        at 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMasterNode(NodeEngineUtil.java:41)
        at 
org.apache.seatunnel.engine.server.execution.TaskExecutionContext.sendToMaster(TaskExecutionContext.java:43)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:335)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:196)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51)
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:161)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:526)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: java.util.concurrent.CompletionException: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException:
 ErrorCode:[RABBITMQ-05], ErrorDescription:[messages could not be acknowledged 
during checkpoint creation] - java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
        at 
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
        at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:663)
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.acknowledgeDeliveryTags(RabbitmqSourceReader.java:196)
        at 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.notifyCheckpointComplete(RabbitmqSourceReader.java:187)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:352)
        at 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:366)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
Source)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown 
Source)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown 
Source)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown 
Source)
        at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown 
Source)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
 Source)
        at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
        at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:366)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:352)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
        at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
        at 
com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
        at 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:51)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:272)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown 
Source)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown 
Source)
        at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
Source)
        at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source)
        at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
        at 
java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source)
        at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.notifyCheckpointCompleted(CheckpointCoordinator.java:662)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:645)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:388)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
Source)
        at 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.internalComplete(PassiveCompletableFuture.java:70)
        at 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.lambda$new$0(PassiveCompletableFuture.java:33)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
Source)
        at 
org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint.acknowledgeTask(PendingCheckpoint.java:147)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:581)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:261)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
        at 
com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
        at 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMasterNode(NodeEngineUtil.java:41)
        at 
org.apache.seatunnel.engine.server.execution.TaskExecutionContext.sendToMaster(TaskExecutionContext.java:43)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:335)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:196)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51)
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:161)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:526)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, 
method-id=80)
        at 
com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at 
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at 
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
        at 
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
        ... 91 more
   Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, 
method-id=80)
        at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
        at 
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
        at 
com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
        at 
com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at 
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
        ... 1 more
   ```
   
   
   ### Flink or Spark Version
   
   zera
   
   ### Java or Scala Version
   
   jre:11
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to