OYRoy opened a new issue, #8782:
URL: https://github.com/apache/seatunnel/issues/8782

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I'm trying to integrate data from multiple PostgreSQL tables into another 
PostgreSQL table, adding a column that describes which table each row 
originates from and another column for record batch codes. I used metadata 
transformation and SQL transformation for this purpose. When using the console 
sink, these transformations work successfully; however, when switching to the 
JDBC sink, the creation of the new table fails due to the additional columns. I 
haven't found any properties in the documentation that allow configuring the 
JDBC sink to recognize these new columns. Is this a bug? Or how can I correctly 
configure my settings?
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   env {
     job.mode = "STREAMING"
     parallelism = 4
   }
   source {
   Jdbc {
           url = "jdbc:postgresql://xxx.xxx.xxx.xxx:xxxx/test"
           driver = "org.postgresql.Driver"
           user = "xxxx"
           password = "xxxx"
           schema_name = "test"
           database_name = "test"
           table_pattern = "^test_[a-z]_\\d{4}$"
           plugin_output = "source_table"
       }
   }
   transform {
   Sql {
           plugin_input = "source_table"
           query = "select *, '123123abc' as batch_code from source_table"
           plugin_output = "source_table1"
   }
   Metadata {
           plugin_input = "source_table1"
           metadata_fields {
                   Table = table
           }
           plugin_output = "source_table2"
   }
   }
   sink {
   jdbc {
           plugin_input = "source_table2"
           url = "jdbc:postgresql://xxx.xxx.xxx.xxx:xxxx/test"
           driver = "org.postgresql.Driver"
           user = "xxxxxx"
           password = "xxxxx"
           table = "testt.ttest_a_1999"
           generate_sink_sql = true
           database = "test"
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        }
   }
   ```
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config testconfig -m local
   ```
   
   ### Error Exception
   
   ```log
   2025-02-21 11:24:29,238 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] 
[seatunnel-coordinator-service-2] - Execute sql : CREATE TABLE 
"testt"."ttest_a_1999" (
   "a" varchar(255),
   "b" varchar(255),
   "c" varchar(255),
   "d" varchar(255),
   "e" varchar(255),
   "f" varchar(255),
   "batch_code" null,
   "table" null
   );
   2025-02-21 11:24:29,242 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] 
[seatunnel-coordinator-service-2] - Catalog Postgres closing
   2025-02-21 11:24:29,242 ERROR [o.a.s.e.s.CoordinatorService  ] 
[seatunnel-coordinator-service-2] - [localhost]:5801 [seatunnel-271533] [5.1] 
submit job 945159011949346817 error 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at 
org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:527)
        at 
org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:533)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at 
org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:256)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$6(CoordinatorService.java:649)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
   Caused by: 
org.apache.seatunnel.api.table.catalog.exception.CatalogException: 
ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed 
creating table test.testt.ttest_a_1999
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:185)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:432)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.savemode.JdbcSaveModeHandler.createTable(JdbcSaveModeHandler.java:48)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:129)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:88)
        at 
org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:40)
        at 
org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at 
org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:525)
        ... 21 more
   Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near 
"null"
     Position: 160
        at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
        at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
        at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
        at 
org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
        at 
org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:177)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:627)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog.createTableInternal(PostgresCatalog.java:167)
        ... 28 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta
   
   ### Java or Scala Version
   
   openjdk 21.0.6
   
   ### Screenshots
   
   
![Image](https://github.com/user-attachments/assets/9d2e64b5-4a42-4cc9-b782-563526728ff0)
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to