AmeliaCcc opened a new issue, #9078:
URL: https://github.com/apache/seatunnel/issues/9078

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When using SeaTunnel to capture MySQL CDC data that contains DATE type 
columns and writing to PostgreSQL, the process fails with type conversion 
errors. The connector cannot properly handle the DATE type representation that 
comes from Debezium.
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "STREAMING"
   }
   
   source {
     MySQL-CDC {
       
base-url="jdbc:mysql://127.0.0.1:3306/mysql_database?useUnicode=true&characterEncoding=utf8"
       username = root
       password = password
       table-names = ["mysql_database.table"]
       startup.mode = "initial"
       connectionTimeZone = "Asia/Shanghai"
       schema-changes.enabled = true
     }
   }
   
   sink {
     Jdbc {    
       driver = org.postgresql.Driver
       url = 
"jdbc:postgresql://127.0.0.1:10000/postgresql_database?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
       user = "user"
       password = "password"
   
       database = "postgresql_database"
       table = "public.table"
       generate_sink_sql = true
       primary_keys = ["id"]
       support_upsert_by_query_primary_key_exist = true
       enable_upsert = true
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config ./job/xxx.conf -m local
   ```
   
   ### Error Exception
   
   ```log
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 
CheckpointCoordinator inside have error.
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:282)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:278)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:391)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:182)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
        at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-20], ErrorDescription:['Postgres' table 
'ministore_center_device.public.device_static_information' unsupported get 
catalog table with field data types '{"delivery_date":"oradate"}']
        at 
org.apache.seatunnel.common.exception.CommonError.getCatalogTableWithUnsupportedType(CommonError.java:174)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.buildColumnsWithErrorCheck(AbstractJdbcCatalog.java:231)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.buildColumnsReturnTablaSchemaBuilder(AbstractJdbcCatalog.java:209)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.getTable(AbstractJdbcCatalog.java:182)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.getDatabaseTableSchema(JdbcSink.java:195)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:140)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:66)
        at 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink.createWriter(MultiTableSink.java:80)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:342)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta
   
   ### Java or Scala Version
   
   openjdk 17.0.14
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to