lm-ylj opened a new issue, #8586:
URL: https://github.com/apache/seatunnel/issues/8586

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   After a period of time, due to expiration, binlog will be automatically 
deleted by the system. On Tencent Cloud, the default is 120 hours. If the data 
in the table has not been modified within a period of time (such as 7 days), 
and there is data written to MySQL at this moment, and the task fails to 
execute due to connection interruption or other reasons, SeaTunnel will 
automatically restart the task. When restarting the task, data will be 
recovered from the checkpoint, but the checkpoint will only update 
startupOffset when data changes occur. Therefore, the checkpoint still records 
the binlog and position from a long time ago. At this moment, the binlog file 
has been deleted by the system. Due to the absence of binlog during the restart 
process, the task cannot be restored normally
   The root cause is that only data changes are made in the following: 
IncrementalSourceRecordEmitter#processElement
   ```java
       //  Only data changes will update startupOffset
       protected void processElement(
               SourceRecord element, Collector<T> output, SourceSplitStateBase 
splitState)
               throws Exception {
           if (isWatermarkEvent(element)) {
               Offset watermark = getWatermark(element);
               if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
                   splitState.asSnapshotSplitState().setLowWatermark(watermark);
               } else if (isHighWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
                   
splitState.asSnapshotSplitState().setHighWatermark(watermark);
               } else if ((isSchemaChangeBeforeWatermarkEvent(element)
                               || isSchemaChangeAfterWatermarkEvent(element))
                       && splitState.isIncrementalSplitState()) {
                   emitElement(element, output);
               }
           } else if (isSchemaChangeEvent(element) && 
splitState.isIncrementalSplitState()) {
               emitElement(element, output);
           } else if (isDataChangeRecord(element)) {
               if (splitState.isIncrementalSplitState()) {
                   Offset position = getOffsetPosition(element);
                   
splitState.asIncrementalSplitState().setStartupOffset(position);
               }
               emitElement(element, output);
           } else {
               emitElement(element, output);
           }
       }
   ```
   We should also modify startupOffset when receiving binlog heartbeat events 
to solve this problem
   
   ### SeaTunnel Version
   
   2.3.7
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 30000
   }
   
   source {
     MySQL-CDC {
       base-url = "jdbc:mysql://xxx/xxx"
       username = "xxx"
       password = "xxx"
       table-names = ["xxx"]
       startup.mode = "initial"
     }
   }
   
   sink {
    Redis {
       host = "xxx"
       port = xxx
       auth = "xxx"
       key = "xxx"
       data_type = "key"
       db_num = 3
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   bin/seatunnel.sh --config task-config/mysqlcdc-to-redis.conf --async -n 
mysqlcdc-to-redis.conf
   ```
   
   ### Error Exception
   
   ```log
   2025-01-22 16:03:08,401 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] 
[hz.main.generic-operation.thread-32] - Job pay_blacklist.conf 
(928945040787505157), Pipeline: [(1/1)], task: [pipeline-1 
[Source[0]-MySQL-CDC]-SourceTask (1/1)] end with state FAILED and Exception: 
java.lang.RuntimeException: One or more fetchers have encountered exception
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:721)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1043)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
        ... 5 more
   Caused by: java.lang.IllegalStateException: The connector is trying to read 
binlog starting at 
Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1737532988174,db=,server_id=0,file=mysql-bin.001566,pos=177960575,row=0},
 but this is no longer available on the server. Reconfigure the connector to 
use a snapshot when needed.
        at 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext.loadStartingOffsetState(MySqlSourceFetchTaskContext.java:281)
        at 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext.configure(MySqlSourceFetchTaskContext.java:127)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:97)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
        ... 6 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to