Edward Zhang created FLINK-39407:
------------------------------------
Summary: CreateTableEvent failed in MySQL to Paimon pipeline
Key: FLINK-39407
URL: https://issues.apache.org/jira/browse/FLINK-39407
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: 1.20.3, 1.19.3
Reporter: Edward Zhang
## In quickstart
I use cdc-up quickstart `cdcup.sh` to test MySQL to Paimon pipeline. After
setting Flink version 1.19.3/1.20.3 with cdc 3.2.1, I run cdc up and pipeline
task (a test_table with primary key is built in advance).
Find pipeline job failed. Here's the log in jobmanager:
```txt
2026-04-07 16:48:26,894 INFO
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler
[] - SchemaChangeStatus switched from APPLYING to FINISHED for request
[CreateTableEvent{tableId=cdcup.test_table, schema=columns={`id` INT NOT
NULL,`name` VARCHAR(255)}, primaryKeys=id, options=()}].
2026-04-07 16:48:27,755 INFO
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler
[] - SchemaChangeStatus switched from FINISHED to IDLE for request
[CreateTableEvent{tableId=cdcup.test_table, schema=columns={`id` INT NOT
NULL,`name` VARCHAR(255)}, primaryKeys=id, options=()}]
2026-04-07 16:48:27,764 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'Source: Flink CDC Event Source: mysql ->
SchemaOperator -> PrePartition' (operator 9899a42c64d67ef3172b7e3be3c1bbb9).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
~[flink-dist-1.19.3.jar:1.19.3]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.clearCurrentSchemaChangeRequest(SchemaRegistryRequestHandler.java:460)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.getSchemaChangeResult(SchemaRegistryRequestHandler.java:321)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$handleCoordinationRequest$3(SchemaRegistry.java:276)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$runInEventLoop$2(SchemaRegistry.java:241)
~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.RuntimeException: Failed to apply schema change.
... 7 more
Caused by: org.apache.flink.cdc.common.exceptions.SchemaEvolveException:
org.apache.paimon.catalog.Catalog$DatabaseNotExistException: Database cdcup
does not exist.
at
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:135)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:238)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:306)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
... 3 more
Caused by: org.apache.paimon.catalog.Catalog$DatabaseNotExistException:
Database cdcup does not exist.
at
org.apache.paimon.catalog.AbstractCatalog.createTable(AbstractCatalog.java:240)
~[?:?]
at
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applyCreateTable(PaimonMetadataApplier.java:172)
~[?:?]
at
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:122)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:238)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:306)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
... 3 more
```
It shows `Catalog$DatabaseNotExistException: Database cdcup does not exist.`
and I find the directory of warehouse described in `pipeline-definition.yaml`
is empty.
```txt
# ls
paimon-warehouse
# cd paimon-warehouse
# ls
#
```
## In manual testing
I try manual testing with Flink CDC 3.5.0 (Flink version 1.20.3) with this
pipeline yaml file:
```yaml
pipeline:
parallelism: 1
source:
type: mysql
hostname: mysql
port: 3306
username: root
password: ''
tables: cdcup.\.*
server-id: 5400-6400
server-time-zone: UTC
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /data/paimon-warehouse
```
Similar error shows below.
```txt
2026-04-07 16:30:28,078 ERROR
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry [] - An
exception was triggered from Schema change applying task. Job will fail now.
org.apache.flink.util.FlinkRuntimeException: Failed to apply schema change
event.
at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:249)
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
[?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.RuntimeException: Create database location failed,
database: cdcup, location: /data/cdcup.db
at
org.apache.paimon.catalog.FileSystemCatalog.createDatabaseImpl(FileSystemCatalog.java:77)
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
at
org.apache.paimon.catalog.AbstractCatalog.createDatabase(AbstractCatalog.java:165)
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
at org.apache.paimon.catalog.Catalog.createDatabase(Catalog.java:90)
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
at
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applyCreateTable(PaimonMetadataApplier.java:166)
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
at
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.lambda$applySchemaChange$2(PaimonMetadataApplier.java:135)
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
at
org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor.visit(SchemaChangeEventVisitor.java:57)
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
at
org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:124)
~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applyAndUpdateEvolvedSchemaChange(SchemaCoordinator.java:437)
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applySchemaChange(SchemaCoordinator.java:406)
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:247)
~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
... 5 more
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)