xxyykkxx opened a new issue, #9129: URL: https://github.com/apache/seatunnel/issues/9129
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When RocketMQ Job running a few days,the job throws `java.lang.NullPointerException` ### SeaTunnel Version 2.3.9 ### SeaTunnel Config ```conf { "env": { "job.mode": "streaming", "checkpoint.interval": "8000", "parallelism": 80 }, "source": [ { "plugin_name": "Rocketmq", "plugin_output": "source_tabl1", "name.srv.addr": "localhost:9876", "format": "json", "consumer.group": "tpws-seatunnel-source-record-topic-consumer", "topics": "tpws-op-new-record-topic", "batch.size": 600, "schema": { "fields": { "merged": "map<string,string>" } } } ], "transform": [ { "plugin_name": "Sql", "plugin_input": "source_tabl1", "plugin_output": "source_table2", "query": "select merged.uniqueNo unique_no,merged.orderNo order_no,merged.orderChannel order_channel,merged.accessDate access_date,merged.createTime create_time,merged.shopCode shop_code,merged.shopType shop_type,merged.shopName shop_name,merged.sellerFlag seller_flag,merged.promiseSendTime promise_send_time,merged.orderStatus order_status,merged.payAmount pay_amount,merged.payTime pay_time,SUBSTR(merged.payTime,0,10) pay_date,merged.orderSendTime order_send_time,SUBSTR(merged.orderSendTime,0,10) order_send_date,merged.takingTime taking_time,merged.sdsPlanTime sds_plan_time,merged.sdsTpwCode sds_tpw_code,merged.sdsTpwStatus sds_tpw_status,merged.sdsEstimateAmt sds_estimate_amt,merged.sdsIgnore sds_ignore,merged.sotPlanTime sot_plan_time,merged.sotTpwCode sot_tpw_code,merged.sotTpwStatus sot_tpw_status,merged.sotEstimateAmt sot_estimate_amt,merged.sotIgnore sot_ignore,merged.takingPlanTime taking_plan_time,merged.totTpwCode tot_tpw_code,merged.totTpwStatus tot_tpw_statu s,merged.totEstimateAmt tot_estimate_amt,merged.totIgnore tot_ignore,merged.tnuPlanTime tnu_plan_time,merged.tnuTpwCode tnu_tpw_code,merged.tnuStatus tnu_status,merged.tnuIgnore tnu_ignore,merged.tnuEstimateAmt tnu_estimate_amt,merged.tnuAfterTime tnu_after_time,merged.sdfPlanTime sdf_plan_time,merged.sdfTpwCode sdf_tpw_code,merged.sdfStatus sdf_status,merged.centerOptTime center_opt_time,merged.takingOrgCode taking_org_code,merged.takingOrgName taking_org_name,merged.takingOrgType taking_org_type,merged.takingManageCode taking_manage_code,merged.takingManageName taking_manage_name,merged.takingProvCode taking_prov_code,merged.takingProvName taking_prov_name,merged.takingAreaCode taking_area_code,merged.takingAreaName taking_area_name,merged.oudPlanTime tnh_plan_time,merged.oudTpwCode tnh_tpw_code,merged.oudStatus tnh_status,merged.handonTime handon_time,merged.unloadTime unload_time,merged.unloadOrgCode unload_org_code,merged.unloadOrgName unload_org_name,merged.ldcPlanTime ldc_pla n_time,merged.ldcTpwCode ldc_tpw_code,merged.ldcStatus ldc_status,merged.realSignTime real_sign_time,merged.ldcOrgCode ldc_org_code,merged.ldcOrgName ldc_org_name,merged.ldcOrgType ldc_org_type,merged.ldcManageCode ldc_manage_code,merged.ldcManageName ldc_manage_name,merged.ldcProvCode ldc_prov_code,merged.ldcProvName ldc_prov_name,merged.ldcAreaCode ldc_area_code,merged.ldcAreaName ldc_area_name,merged.achievePreTime achieve_pre_time,merged.lroPreTime lro_pre_time,merged.obtPlanTime obt_plan_time,merged.obtTpwCode obt_tpw_code,merged.obtTpwStatus obt_status,merged.stageTime stage_time,merged.sellerName seller_name,merged.sellerCode seller_code,merged.customerCode customer_code,merged.customerName customer_name,merged.arrivalManageCode arrival_manage_code,merged.arrivalManageName arrival_manage_name,merged.arrivalProvCode arrival_prov_code,merged.arrivalProvName arrival_prov_name,merged.arrivalAreaCode arrival_area_code,merged.arrivalAreaName arrival_area_name,merged.allotOrgCode al lot_org_code,merged.allotOrgType allot_org_type,merged.allotOrgName allot_org_name,merged.destOrgCode dest_org_code,merged.destOrgName dest_org_name,merged.destOrgType dest_org_type,merged.outManageCode out_manage_code,merged.outManageName out_manage_name,merged.outProvCode out_prov_code,merged.outProvName out_prov_name,merged.outAreaCode out_area_code,merged.outAreaName out_area_name,merged.orderNoType order_no_type,merged.ecvlTime ecvl_time,SUBSTR(merged.ecvlTime,0,10) ecvl_date,merged.unloadTime endo_time,SUBSTR(merged.unloadTime,0,10) endo_date,merged.timestamp version from source_tabl1" } ], "sink": [ { "plugin_name": "Doris", "plugin_input": "source_table2", "doris.config": { "format": "json", "read_json_by_line": "true" }, "fenodes": "localhost:8030", "password": "12345", "username": "aa", "database": "bb", "table": "cc", "doris.batch.size": 8192, "sink.buffer-count": 8, "sink.buffer-size": 2097152 } ] } ``` ### Running Command ```shell bin/seatunnel.sh ``` ### Error Exception ```log 2025-04-08 20:50:18,483 ERROR [.s.e.s.c.CheckpointCoordinator] [hz.main.generic-operation.thread-36] - report error from task org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NullPointerException at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:378) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$231(SeaTunnelTask.java:426) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423) at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$385(NotifyTaskRestoreOperation.java:107) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121) at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:375) ... 18 more Caused by: java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.getTaskMemberAddressByIndex(SourceSplitEnumeratorTask.java:263) at org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext.assignSplit(SeaTunnelSplitEnumeratorContext.java:85) at java.util.HashMap.forEach(HashMap.java:1290) at org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplitEnumerator.assignSplit(RocketMqSourceSplitEnumerator.java:354) at org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplitEnumerator.addSplitsBack(RocketMqSourceSplitEnumerator.java:140) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.addSplitsBack(SourceSplitEnumeratorTask.java:216) at org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation.lambda$runInternal$365(RestoredSplitOperation.java:103) at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) at org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation.runInternal(RestoredSplitOperation.java:87) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) at ------ submitted from ------.() at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336) at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112) ... 21 more at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:398) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:195) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.10-SNAPSHOT] ``` ### Zeta or Flink or Spark Version Zeta ### Java or Scala Version 1.8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org