tusiCHN opened a new issue, #8924:
URL: https://github.com/apache/seatunnel/issues/8924

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   mysql multiple tables, adding metadata sent to kafka error
   
   1. mysql multiple tables, do not add metadata when sent to kafka normal
   2. mysql single table, adding metadata when sent to kafka normal
   3. mysql multiple tables, add metadata when sent to the console and local 
files  normal
   
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   {
       "env" : {
           "execution.parallelism" : 1,
           "job.mode" : "STREAMING",
           "checkpoint.interval" : 5000,
           "read_limit.bytes_per_second" : 7000000,
           "read_limit.rows_per_second" : 400
       },
       "source" : [
           {
               "plugin_output" : "customers_mysql_cdc",
               "server-id" : 5652,
               "base-url" : 
"jdbc:mysql://xxxx:3306/seatunnel?useSSL=false&allowPublicKeyRetrieval=true",
               "username" : "******",
               "password" : "******",
               "table-names" : [
                   "seatunnel.role",
                   "seatunnel.user"
               ],
               "startup.mode" : "initial",
               "plugin_name" : "MySQL-CDC"
           }
       ],
       "transform" : [
           {
               "metadata_fields" : {
                   "Database" : "database",
                   "Table" : "table",
                   "RowKind" : "rowKind",
                   "EventTime" : "ts_ms",
                   "Delay" : "delay"
               },
               "plugin_output" : "trans_result",
               "plugin_name" : "Metadata"
           }
       ],
       "sink" : [
           {
               "topic" : "xxxxxxxxxxx",
               "bootstrap.servers" : "xxxxxxxxxxxx",
               "format" : "json",
               "plugin_name" : "kafka"
           }
       ]
   }
   ```
   
   ### Running Command
   
   ```shell
   /data/tmp/apache-seatunnel-2.3.9/bin/seatunnel.sh --name 
mysql2kafka_database --config 
/data/tmp/apache-seatunnel-2.3.9/task_config/mysql2kafka_cdc_database.config  
-Xms16m  -Xmx128m --async> 
/data/tmp/apache-seatunnel-2.3.9/log/mysql2kafka_database.log 2>&1 &
   ```
   
   ### Error Exception
   
   ```log
   [950196515014443009] 2025-03-07 09:01:47,776 WARN  
[o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=950196515014443009, pipelineId=1, 
taskGroupId=30000}] - [localhost]:5801 [seatunnel] [5.1] Exception in 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@24bf3b8d
   java.lang.RuntimeException: 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Common JSON convert/parse 
'SeaTunnelRow{tableId=seatunnel.role, kind=+I, fields=[1, 0, ADMIN_ROLE, Admin 
User, 2024-12-27T16:16:02.382, 2024-12-27T16:16:02.382, seatunnel, role, +I, 
1741309307016, 0]}' operation failed.]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) 
~[seatunnel-starter.jar:2.3.9]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_282]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_282]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_282]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_282]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Common JSON convert/parse 
'SeaTunnelRow{tableId=seatunnel.role, kind=+I, fields=[1, 0, ADMIN_ROLE, Admin 
User, 2024-12-27T16:16:02.382, 2024-12-27T16:16:02.382, seatunnel, role, +I, 
1741309307016, 0]}' operation failed.]
        at 
org.apache.seatunnel.common.exception.CommonError.jsonOperationError(CommonError.java:204)
 ~[seatunnel-starter.jar:2.3.9]
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:79)
 ~[connector-file-local-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.lambda$valueExtractor$9(DefaultSeaTunnelRowSerializer.java:198)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.serializeRow(DefaultSeaTunnelRowSerializer.java:70)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:114)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:59)
 ~[?:?]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:268)
 ~[seatunnel-starter.jar:2.3.9]
        ... 17 more
   Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.String
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$12.convert(RowToJsonConverters.java:159)
 ~[connector-file-local-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
 ~[connector-file-local-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$18.convert(RowToJsonConverters.java:235)
 ~[connector-file-local-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
 ~[connector-file-local-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:76)
 ~[connector-file-local-2.3.9.jar:2.3.9]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.lambda$valueExtractor$9(DefaultSeaTunnelRowSerializer.java:198)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.serializeRow(DefaultSeaTunnelRowSerializer.java:70)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:114)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:59)
 ~[?:?]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:268)
 ~[seatunnel-starter.jar:2.3.9]
        ... 17 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta
   
   ### Java or Scala Version
   
   openjdk version "1.8.0_282"
   
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to