Edward Zhang created FLINK-39407:
------------------------------------

             Summary: CreateTableEvent failed in MySQL to Paimon pipeline
                 Key: FLINK-39407
                 URL: https://issues.apache.org/jira/browse/FLINK-39407
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: 1.20.3, 1.19.3
            Reporter: Edward Zhang


## In quickstart
I use cdc-up quickstart `cdcup.sh` to test MySQL to Paimon pipeline. After 
setting Flink version 1.19.3/1.20.3 with cdc 3.2.1, I run cdc up and pipeline 
task (a test_table with primary key is built in advance). 

Find pipeline job failed. Here's the log in jobmanager:

```txt
2026-04-07 16:48:26,894 INFO  
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler
 [] - SchemaChangeStatus switched from APPLYING to FINISHED for request 
[CreateTableEvent{tableId=cdcup.test_table, schema=columns={`id` INT NOT 
NULL,`name` VARCHAR(255)}, primaryKeys=id, options=()}].

2026-04-07 16:48:27,755 INFO  
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler
 [] - SchemaChangeStatus switched from FINISHED to IDLE for request 
[CreateTableEvent{tableId=cdcup.test_table, schema=columns={`id` INT NOT 
NULL,`name` VARCHAR(255)}, primaryKeys=id, options=()}]

2026-04-07 16:48:27,764 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Trying to recover from a global failure.

org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: Flink CDC Event Source: mysql -> 
SchemaOperator -> PrePartition' (operator 9899a42c64d67ef3172b7e3be3c1bbb9).

        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
 ~[flink-dist-1.19.3.jar:1.19.3]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.clearCurrentSchemaChangeRequest(SchemaRegistryRequestHandler.java:460)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.getSchemaChangeResult(SchemaRegistryRequestHandler.java:321)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$handleCoordinationRequest$3(SchemaRegistry.java:276)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$runInEventLoop$2(SchemaRegistry.java:241)
 ~[?:?]

        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]

        at java.lang.Thread.run(Unknown Source) ~[?:?]

Caused by: java.lang.RuntimeException: Failed to apply schema change.

        ... 7 more

Caused by: org.apache.flink.cdc.common.exceptions.SchemaEvolveException: 
org.apache.paimon.catalog.Catalog$DatabaseNotExistException: Database cdcup 
does not exist.

        at 
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:135)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:238)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:306)
 ~[?:?]

        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]

        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]

        ... 3 more

Caused by: org.apache.paimon.catalog.Catalog$DatabaseNotExistException: 
Database cdcup does not exist.

        at 
org.apache.paimon.catalog.AbstractCatalog.createTable(AbstractCatalog.java:240) 
~[?:?]

        at 
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applyCreateTable(PaimonMetadataApplier.java:172)
 ~[?:?]

        at 
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:122)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:238)
 ~[?:?]

        at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:306)
 ~[?:?]

        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]

        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]

        ... 3 more
```

It shows `Catalog$DatabaseNotExistException: Database cdcup does not exist.` 
and I find the directory of warehouse described in `pipeline-definition.yaml` 
is empty.

```txt
# ls
paimon-warehouse
# cd paimon-warehouse
# ls
# 
```

## In manual testing

I try manual testing with Flink CDC 3.5.0 (Flink version 1.20.3) with this 
pipeline yaml file:

```yaml
pipeline:
  parallelism: 1
source:
  type: mysql
  hostname: mysql
  port: 3306
  username: root
  password: ''
  tables: cdcup.\.*
  server-id: 5400-6400
  server-time-zone: UTC
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /data/paimon-warehouse
```

Similar error shows below.

```txt
2026-04-07 16:30:28,078 ERROR 
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry [] - An 
exception was triggered from Schema change applying task. Job will fail now.

org.apache.flink.util.FlinkRuntimeException: Failed to apply schema change 
event.

        at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:249)
 
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]

        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
[?:?]

        at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?]

        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source) [?:?]

        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source) [?:?]

        at java.base/java.lang.Thread.run(Unknown Source) [?:?]

Caused by: java.lang.RuntimeException: Create database location failed, 
database: cdcup, location: /data/cdcup.db

        at 
org.apache.paimon.catalog.FileSystemCatalog.createDatabaseImpl(FileSystemCatalog.java:77)
 
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]

        at 
org.apache.paimon.catalog.AbstractCatalog.createDatabase(AbstractCatalog.java:165)
 
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]

        at org.apache.paimon.catalog.Catalog.createDatabase(Catalog.java:90) 
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]

        at 
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applyCreateTable(PaimonMetadataApplier.java:166)
 
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]

        at 
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.lambda$applySchemaChange$2(PaimonMetadataApplier.java:135)
 
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]

        at 
org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor.visit(SchemaChangeEventVisitor.java:57)
 
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]

        at 
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:124)
 
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]

        at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applyAndUpdateEvolvedSchemaChange(SchemaCoordinator.java:437)
 
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]

        at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applySchemaChange(SchemaCoordinator.java:406)
 
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]

        at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:247)
 
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]

        ... 5 more
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to