deamonJohn opened a new issue, #9063:
URL: https://github.com/apache/seatunnel/issues/9063

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   
   UnsupportedOperationException: Flink ParallelSource don't support sending 
SourceEvent. Please implement the `SupportCoordinate` marker interface on the 
SeaTunnel source.
   
   
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   我的配置模板:
   env {
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 30000
   }
   
   source {
     Mysql-CDC {
       base-url = "jdbc:mysql://xxxxxx:3306/shucang"
       username = "xxxx"
       password = "xxxx"
       table-names = ["shucang.xxxx"]
     }
   }
   
   transform {
   }
   
   sink {
     Hudi {
       table_dfs_path = "s3a://xxxxx/data_sync/xx"
       database = "hudi_test"
       table_name = "tmp_seatunnel_sync_conf"
       table_type = "COPY_ON_WRITE"
       op_type="UPSERT"
       record_key_fields = "id"
       batch_size = 10000
       schema_save_mode= "CREATE_SCHEMA_WHEN_NOT_EXIST"
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   执行命令:
   start-seatunnel-spark-3-connector-v2.sh --config 
/data/dolphinscheduler/worker-server/data//exec/process/hadoop/134438271308768/136733097342912_26/483/718/seatunnel_483_718.conf
 --deploy-mode cluster --master yarn
   ```
   
   ### Error Exception
   
   ```log
   结果:
   25/03/27 02:55:48 INFO MySqlSnapshotSplitReadTask: Exporting data from split 
'ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0' of table 
ad_shucang.tmp_seatunnel_sync_pici_plan_conf
   25/03/27 02:55:48 INFO MySqlSnapshotSplitReadTask: For split 
'ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0' of table 
ad_shucang.tmp_seatunnel_sync_pici_plan_conf using select statement: 'SELECT * 
FROM `ad_shucang`.`tmp_seatunnel_sync_pici_plan_conf`'
   25/03/27 02:55:49 INFO MetricsConfig: Loaded properties from 
hadoop-metrics2.properties
   25/03/27 02:55:49 INFO MetricsSystemImpl: Scheduled Metric snapshot period 
at 300 second(s).
   25/03/27 02:55:49 INFO MetricsSystemImpl: s3a-file-system metrics system 
started
   SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
   25/03/27 02:55:49 WARN S3AFileSystem: Getting region for bucket 
hungry-studio-tmp from S3, this will slow down FS initialisation. To avoid 
this, set the region using property 
fs.s3a.bucket.hungry-studio-tmp.endpoint.region
   25/03/27 02:55:50 INFO MySqlSnapshotSplitReadTask: Finished exporting 5643 
records for split 'ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0', total 
duration '00:00:02.164'
   25/03/27 02:55:50 INFO MySqlSnapshotSplitReadTask: Snapshot step 3 - 
Determining high watermark {ts_sec=0, file=mysql-bin.009142, pos=729170768, 
gtids=4402e7c4-1904-11eb-81ae-525400652695:1-736682722,
   44090e1f-1904-11eb-81ae-5254009a5295:1-4680275586,
   e3847b20-ee33-11e9-b422-525400e2a2c1:421037908-421596888, row=0, event=0} 
for split SnapshotSplit(tableId=ad_shucang.tmp_seatunnel_sync_pici_plan_conf, 
splitKeyType=ROW<id BIGINT>, splitStart=null, splitEnd=null, lowWatermark=null, 
highWatermark=null)
   25/03/27 02:55:50 INFO SplitFetcher: Finished reading from splits 
[ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0]
   25/03/27 02:55:50 INFO SourceReaderBase: Finished reading split(s) 
[ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0]
   25/03/27 02:55:50 ERROR ParallelBatchPartitionReader: BatchPartitionReader 
execute failed.
   java.lang.UnsupportedOperationException: Flink ParallelSource don't support 
sending SourceEvent. Please implement the `SupportCoordinate` marker interface 
on the SeaTunnel source.
        at 
org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:68)
 ~[__app__.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:196)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:178)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:212)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:188)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:144)
 ~[__app__.jar:2.3.9]
        at 
org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.lambda$prepare$0(ParallelBatchPartitionReader.java:117)
 ~[__app__.jar:2.3.9]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_442]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_442]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_442]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_442]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_442]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_442]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_442]
   25/03/27 02:55:50 ERROR Utils: Aborting task
   java.lang.RuntimeException: java.lang.UnsupportedOperationException: Flink 
ParallelSource don't support sending SourceEvent. Please implement the 
`SupportCoordinate` marker interface on the SeaTunnel source.
        at 
org.apache.seatunnel.translation.spark.source.partition.batch.SeaTunnelBatchPartitionReader.next(SeaTunnelBatchPartitionReader.java:38)
 ~[__app__.jar:2.3.9]
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.15.jar:?]
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
 ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
 ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) 
~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 
~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.scheduler.Task.run(Task.scala:141) 
~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563)
 ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) 
~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) 
~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_442]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_442]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_442]
   Caused by: java.lang.UnsupportedOperationException: Flink ParallelSource 
don't support sending SourceEvent. Please implement the `SupportCoordinate` 
marker interface on the SeaTunnel source.
        at 
org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:68)
 ~[__app__.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:196)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:178)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:212)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:188)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
 ~[connector-cdc-mysql-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:144)
 ~[__app__.jar:2.3.9]
        at 
org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.lambda$prepare$0(ParallelBatchPartitionReader.java:117)
 ~[__app__.jar:2.3.9]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_442]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_442]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_442]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_442]
        ... 3 more
   25/03/27 02:55:50 ERROR DataWritingSparkTask: Aborting commit for partition 
0 (task 0, attempt 0, stage 0.0)
   25/03/27 02:55:50 ERROR DataWritingSparkTask: Aborted commit for partition 0 
(task 0, attempt 0, stage 0.0)
   25/03/27 02:55:50 INFO IncrementalSourceEnumerator: Closing enumerator...
   25/03/27 02:55:50 INFO SourceReaderBase: Closing Source Reader 0.
   25/03/27 02:55:50 INFO SplitFetcher: Shutting down split fetcher 0
   25/03/27 02:55:50 INFO IncrementalSourceSplitReader: Close current fetcher 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
   25/03/27 02:55:51 INFO JdbcConnection: Connection gracefully closed
   25/03/27 02:55:51 INFO SplitFetcher: Split fetcher 0 exited.
   25/03/27 02:55:51 INFO LoggingEventHandler: log event: 
ReaderCloseEvent(createdTime=1743044151166, 
jobId=application_1740982084970_0189, eventType=LIFECYCLE_READER_CLOSE)
   25/03/27 02:55:51 INFO LoggingEventHandler: log event: 
EnumeratorCloseEvent(createdTime=1743044151167, 
jobId=application_1740982084970_0189, eventType=LIFECYCLE_ENUMERATOR_CLOSE)
   ```
   
   ### Zeta or Flink or Spark Version
   
   spark 3.4.1
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to