xxyykkxx opened a new issue, #9129:
URL: https://github.com/apache/seatunnel/issues/9129

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When RocketMQ Job running a few days,the job throws 
`java.lang.NullPointerException`
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   {
       "env": {
           "job.mode": "streaming",
           "checkpoint.interval": "8000",
           "parallelism": 80
       },
       "source": [
           {
               "plugin_name": "Rocketmq",
               "plugin_output": "source_tabl1",
               "name.srv.addr": "localhost:9876",
               "format": "json",
               "consumer.group": "tpws-seatunnel-source-record-topic-consumer",
               "topics": "tpws-op-new-record-topic",
               "batch.size": 600,
               "schema": {
                   "fields": {
                       "merged": "map<string,string>"
                   }
               }
           }
       ],
       "transform": [
           {
               "plugin_name": "Sql",
               "plugin_input": "source_tabl1",
               "plugin_output": "source_table2",
               "query": "select merged.uniqueNo unique_no,merged.orderNo 
order_no,merged.orderChannel order_channel,merged.accessDate 
access_date,merged.createTime create_time,merged.shopCode 
shop_code,merged.shopType shop_type,merged.shopName shop_name,merged.sellerFlag 
seller_flag,merged.promiseSendTime promise_send_time,merged.orderStatus 
order_status,merged.payAmount pay_amount,merged.payTime 
pay_time,SUBSTR(merged.payTime,0,10) pay_date,merged.orderSendTime 
order_send_time,SUBSTR(merged.orderSendTime,0,10) 
order_send_date,merged.takingTime taking_time,merged.sdsPlanTime 
sds_plan_time,merged.sdsTpwCode sds_tpw_code,merged.sdsTpwStatus 
sds_tpw_status,merged.sdsEstimateAmt sds_estimate_amt,merged.sdsIgnore 
sds_ignore,merged.sotPlanTime sot_plan_time,merged.sotTpwCode 
sot_tpw_code,merged.sotTpwStatus sot_tpw_status,merged.sotEstimateAmt 
sot_estimate_amt,merged.sotIgnore sot_ignore,merged.takingPlanTime 
taking_plan_time,merged.totTpwCode tot_tpw_code,merged.totTpwStatus 
tot_tpw_statu
 s,merged.totEstimateAmt tot_estimate_amt,merged.totIgnore 
tot_ignore,merged.tnuPlanTime tnu_plan_time,merged.tnuTpwCode 
tnu_tpw_code,merged.tnuStatus tnu_status,merged.tnuIgnore 
tnu_ignore,merged.tnuEstimateAmt tnu_estimate_amt,merged.tnuAfterTime 
tnu_after_time,merged.sdfPlanTime sdf_plan_time,merged.sdfTpwCode 
sdf_tpw_code,merged.sdfStatus sdf_status,merged.centerOptTime 
center_opt_time,merged.takingOrgCode taking_org_code,merged.takingOrgName 
taking_org_name,merged.takingOrgType taking_org_type,merged.takingManageCode 
taking_manage_code,merged.takingManageName 
taking_manage_name,merged.takingProvCode taking_prov_code,merged.takingProvName 
taking_prov_name,merged.takingAreaCode taking_area_code,merged.takingAreaName 
taking_area_name,merged.oudPlanTime tnh_plan_time,merged.oudTpwCode 
tnh_tpw_code,merged.oudStatus tnh_status,merged.handonTime 
handon_time,merged.unloadTime unload_time,merged.unloadOrgCode 
unload_org_code,merged.unloadOrgName unload_org_name,merged.ldcPlanTime ldc_pla
 n_time,merged.ldcTpwCode ldc_tpw_code,merged.ldcStatus 
ldc_status,merged.realSignTime real_sign_time,merged.ldcOrgCode 
ldc_org_code,merged.ldcOrgName ldc_org_name,merged.ldcOrgType 
ldc_org_type,merged.ldcManageCode ldc_manage_code,merged.ldcManageName 
ldc_manage_name,merged.ldcProvCode ldc_prov_code,merged.ldcProvName 
ldc_prov_name,merged.ldcAreaCode ldc_area_code,merged.ldcAreaName 
ldc_area_name,merged.achievePreTime achieve_pre_time,merged.lroPreTime 
lro_pre_time,merged.obtPlanTime obt_plan_time,merged.obtTpwCode 
obt_tpw_code,merged.obtTpwStatus obt_status,merged.stageTime 
stage_time,merged.sellerName seller_name,merged.sellerCode 
seller_code,merged.customerCode customer_code,merged.customerName 
customer_name,merged.arrivalManageCode 
arrival_manage_code,merged.arrivalManageName 
arrival_manage_name,merged.arrivalProvCode 
arrival_prov_code,merged.arrivalProvName 
arrival_prov_name,merged.arrivalAreaCode 
arrival_area_code,merged.arrivalAreaName arrival_area_name,merged.allotOrgCode 
al
 lot_org_code,merged.allotOrgType allot_org_type,merged.allotOrgName 
allot_org_name,merged.destOrgCode dest_org_code,merged.destOrgName 
dest_org_name,merged.destOrgType dest_org_type,merged.outManageCode 
out_manage_code,merged.outManageName out_manage_name,merged.outProvCode 
out_prov_code,merged.outProvName out_prov_name,merged.outAreaCode 
out_area_code,merged.outAreaName out_area_name,merged.orderNoType 
order_no_type,merged.ecvlTime ecvl_time,SUBSTR(merged.ecvlTime,0,10) 
ecvl_date,merged.unloadTime endo_time,SUBSTR(merged.unloadTime,0,10) 
endo_date,merged.timestamp version from source_tabl1"
           }
       ],
       "sink": [
           {
               "plugin_name": "Doris",
               "plugin_input": "source_table2",
               "doris.config": {
                   "format": "json",
                   "read_json_by_line": "true"
               },
               "fenodes": "localhost:8030",
               "password": "12345",
               "username": "aa",
               "database": "bb",
               "table": "cc",
               "doris.batch.size": 8192,
               "sink.buffer-count": 8,
               "sink.buffer-size": 2097152
           }
       ]
   }
   ```
   
   ### Running Command
   
   ```shell
   bin/seatunnel.sh
   ```
   
   ### Error Exception
   
   ```log
   2025-04-08 20:50:18,483 ERROR [.s.e.s.c.CheckpointCoordinator] 
[hz.main.generic-operation.thread-36] - report error from task
   org.apache.seatunnel.common.utils.SeaTunnelException: 
java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:378)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$231(SeaTunnelTask.java:426)
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
           at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
           at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
           at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
           at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
           at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
           at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
           at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
           at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$385(NotifyTaskRestoreOperation.java:107)
           at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
           at 
com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:375)
           ... 18 more
   Caused by: java.lang.NullPointerException
           at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
           at 
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.getTaskMemberAddressByIndex(SourceSplitEnumeratorTask.java:263)
           at 
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext.assignSplit(SeaTunnelSplitEnumeratorContext.java:85)
           at java.util.HashMap.forEach(HashMap.java:1290)
           at 
org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplitEnumerator.assignSplit(RocketMqSourceSplitEnumerator.java:354)
           at 
org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplitEnumerator.addSplitsBack(RocketMqSourceSplitEnumerator.java:140)
           at 
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.addSplitsBack(SourceSplitEnumeratorTask.java:216)
           at 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation.lambda$runInternal$365(RestoredSplitOperation.java:103)
           at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
           at 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation.runInternal(RestoredSplitOperation.java:87)
           at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
           at ------ submitted from ------.()
           at 
com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336)
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112)
           ... 21 more
   
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:398)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:195)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 ~[seatunnel-starter.jar:2.3.10-SNAPSHOT]
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta
   
   ### Java or Scala Version
   
   1.8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to