lm-ylj opened a new issue, #8586: URL: https://github.com/apache/seatunnel/issues/8586
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened After a period of time, due to expiration, binlog will be automatically deleted by the system. On Tencent Cloud, the default is 120 hours. If the data in the table has not been modified within a period of time (such as 7 days), and there is data written to MySQL at this moment, and the task fails to execute due to connection interruption or other reasons, SeaTunnel will automatically restart the task. When restarting the task, data will be recovered from the checkpoint, but the checkpoint will only update startupOffset when data changes occur. Therefore, the checkpoint still records the binlog and position from a long time ago. At this moment, the binlog file has been deleted by the system. Due to the absence of binlog during the restart process, the task cannot be restored normally The root cause is that only data changes are made in the following: IncrementalSourceRecordEmitter#processElement ```java // Only data changes will update startupOffset protected void processElement( SourceRecord element, Collector<T> output, SourceSplitStateBase splitState) throws Exception { if (isWatermarkEvent(element)) { Offset watermark = getWatermark(element); if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setLowWatermark(watermark); } else if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setHighWatermark(watermark); } else if ((isSchemaChangeBeforeWatermarkEvent(element) || isSchemaChangeAfterWatermarkEvent(element)) && splitState.isIncrementalSplitState()) { emitElement(element, output); } } else if (isSchemaChangeEvent(element) && splitState.isIncrementalSplitState()) { emitElement(element, output); } else if (isDataChangeRecord(element)) { if (splitState.isIncrementalSplitState()) { Offset position = getOffsetPosition(element); splitState.asIncrementalSplitState().setStartupOffset(position); } emitElement(element, output); } else { emitElement(element, output); } } ``` We should also modify startupOffset when receiving binlog heartbeat events to solve this problem ### SeaTunnel Version 2.3.7 ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 30000 } source { MySQL-CDC { base-url = "jdbc:mysql://xxx/xxx" username = "xxx" password = "xxx" table-names = ["xxx"] startup.mode = "initial" } } sink { Redis { host = "xxx" port = xxx auth = "xxx" key = "xxx" data_type = "key" db_num = 3 } } ``` ### Running Command ```shell bin/seatunnel.sh --config task-config/mysqlcdc-to-redis.conf --async -n mysqlcdc-to-redis.conf ``` ### Error Exception ```log 2025-01-22 16:03:08,401 ERROR [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.generic-operation.thread-32] - Job pay_blacklist.conf (928945040787505157), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-MySQL-CDC]-SourceTask (1/1)] end with state FAILED and Exception: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:721) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1043) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) ... 5 more Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1737532988174,db=,server_id=0,file=mysql-bin.001566,pos=177960575,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed. at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext.loadStartingOffsetState(MySqlSourceFetchTaskContext.java:281) at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext.configure(MySqlSourceFetchTaskContext.java:127) at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:97) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ... 6 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org