deamonJohn opened a new issue, #9063: URL: https://github.com/apache/seatunnel/issues/9063
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source. ### SeaTunnel Version 2.3.9 ### SeaTunnel Config ```conf 我的配置模板: env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 30000 } source { Mysql-CDC { base-url = "jdbc:mysql://xxxxxx:3306/shucang" username = "xxxx" password = "xxxx" table-names = ["shucang.xxxx"] } } transform { } sink { Hudi { table_dfs_path = "s3a://xxxxx/data_sync/xx" database = "hudi_test" table_name = "tmp_seatunnel_sync_conf" table_type = "COPY_ON_WRITE" op_type="UPSERT" record_key_fields = "id" batch_size = 10000 schema_save_mode= "CREATE_SCHEMA_WHEN_NOT_EXIST" } } ``` ### Running Command ```shell 执行命令: start-seatunnel-spark-3-connector-v2.sh --config /data/dolphinscheduler/worker-server/data//exec/process/hadoop/134438271308768/136733097342912_26/483/718/seatunnel_483_718.conf --deploy-mode cluster --master yarn ``` ### Error Exception ```log 结果: 25/03/27 02:55:48 INFO MySqlSnapshotSplitReadTask: Exporting data from split 'ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0' of table ad_shucang.tmp_seatunnel_sync_pici_plan_conf 25/03/27 02:55:48 INFO MySqlSnapshotSplitReadTask: For split 'ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0' of table ad_shucang.tmp_seatunnel_sync_pici_plan_conf using select statement: 'SELECT * FROM `ad_shucang`.`tmp_seatunnel_sync_pici_plan_conf`' 25/03/27 02:55:49 INFO MetricsConfig: Loaded properties from hadoop-metrics2.properties 25/03/27 02:55:49 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 300 second(s). 25/03/27 02:55:49 INFO MetricsSystemImpl: s3a-file-system metrics system started SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 25/03/27 02:55:49 WARN S3AFileSystem: Getting region for bucket hungry-studio-tmp from S3, this will slow down FS initialisation. To avoid this, set the region using property fs.s3a.bucket.hungry-studio-tmp.endpoint.region 25/03/27 02:55:50 INFO MySqlSnapshotSplitReadTask: Finished exporting 5643 records for split 'ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0', total duration '00:00:02.164' 25/03/27 02:55:50 INFO MySqlSnapshotSplitReadTask: Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.009142, pos=729170768, gtids=4402e7c4-1904-11eb-81ae-525400652695:1-736682722, 44090e1f-1904-11eb-81ae-5254009a5295:1-4680275586, e3847b20-ee33-11e9-b422-525400e2a2c1:421037908-421596888, row=0, event=0} for split SnapshotSplit(tableId=ad_shucang.tmp_seatunnel_sync_pici_plan_conf, splitKeyType=ROW<id BIGINT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) 25/03/27 02:55:50 INFO SplitFetcher: Finished reading from splits [ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0] 25/03/27 02:55:50 INFO SourceReaderBase: Finished reading split(s) [ad_shucang.tmp_seatunnel_sync_pici_plan_conf:0] 25/03/27 02:55:50 ERROR ParallelBatchPartitionReader: BatchPartitionReader execute failed. java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source. at org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:68) ~[__app__.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:196) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:178) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:212) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:188) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:144) ~[__app__.jar:2.3.9] at org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.lambda$prepare$0(ParallelBatchPartitionReader.java:117) ~[__app__.jar:2.3.9] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_442] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_442] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_442] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_442] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_442] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_442] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_442] 25/03/27 02:55:50 ERROR Utils: Aborting task java.lang.RuntimeException: java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source. at org.apache.seatunnel.translation.spark.source.partition.batch.SeaTunnelBatchPartitionReader.next(SeaTunnelBatchPartitionReader.java:38) ~[__app__.jar:2.3.9] at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.15.jar:?] at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) ~[?:?] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_442] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_442] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_442] Caused by: java.lang.UnsupportedOperationException: Flink ParallelSource don't support sending SourceEvent. Please implement the `SupportCoordinate` marker interface on the SeaTunnel source. at org.apache.seatunnel.translation.source.ParallelReaderContext.sendSourceEventToEnumerator(ParallelReaderContext.java:68) ~[__app__.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.reportFinishedSnapshotSplitsIfNeed(IncrementalSourceReader.java:196) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:178) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:212) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:188) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[connector-cdc-mysql-2.3.9.jar:2.3.9] at org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:144) ~[__app__.jar:2.3.9] at org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader.lambda$prepare$0(ParallelBatchPartitionReader.java:117) ~[__app__.jar:2.3.9] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_442] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_442] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_442] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_442] ... 3 more 25/03/27 02:55:50 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 0, attempt 0, stage 0.0) 25/03/27 02:55:50 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 0, attempt 0, stage 0.0) 25/03/27 02:55:50 INFO IncrementalSourceEnumerator: Closing enumerator... 25/03/27 02:55:50 INFO SourceReaderBase: Closing Source Reader 0. 25/03/27 02:55:50 INFO SplitFetcher: Shutting down split fetcher 0 25/03/27 02:55:50 INFO IncrementalSourceSplitReader: Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher 25/03/27 02:55:51 INFO JdbcConnection: Connection gracefully closed 25/03/27 02:55:51 INFO SplitFetcher: Split fetcher 0 exited. 25/03/27 02:55:51 INFO LoggingEventHandler: log event: ReaderCloseEvent(createdTime=1743044151166, jobId=application_1740982084970_0189, eventType=LIFECYCLE_READER_CLOSE) 25/03/27 02:55:51 INFO LoggingEventHandler: log event: EnumeratorCloseEvent(createdTime=1743044151167, jobId=application_1740982084970_0189, eventType=LIFECYCLE_ENUMERATOR_CLOSE) ``` ### Zeta or Flink or Spark Version spark 3.4.1 ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org