Ran Tao created FLINK-39934:
-------------------------------
Summary: Postgres pipeline connector reports misleading table
format error message
Key: FLINK-39934
URL: https://issues.apache.org/jira/browse/FLINK-39934
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Reporter: Ran Tao
*Description*
When using the Postgres pipeline connector with an invalid tables pattern,
the validation error message does not show the actual configured value.
Instead, it always prints the option key tables, which makes
the error hard to diagnose.
For example, with the following pipeline source config:
{code:java}
sources:
- source:
type: postgres
name: Postgres Source
hostname: <host>
port: 5432
username: flink
password: <password>
tables: flink.public.orders.*
slot.name: pg_source_slot_test
decoding.plugin.name: pgoutput
scan.startup.mode: initial {code}
The table pattern is invalid because it is split as four parts by the
unescaped dot in orders_.*. However, the thrown exception prints:
Tables format must db.schema.table, can not 'tables' = tables
The message should print the actual invalid table pattern, for example:
Tables format must db.schema.table, can not 'tables' = flink.public.orders_.*
*Stack Trace*
{code:java}
Caused by: java.lang.IllegalStateException: Tables format must db.schema.table,
can not 'tables' = tables
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory.getValidateDatabaseName(PostgresDataSourceFactory.java:382)
at
org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory.createDataSource(PostgresDataSourceFactory.java:146)
at
org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:132)
at
org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSources(DataSourceTranslator.java:112)
at
org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.translate(FlinkPipelineComposer.java:150)
at
org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:110)
at
org.apache.flink.cdc.cli.CliExecutor.deployWithComposer(CliExecutor.java:132)
at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:97)
at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:81) {code}
*Root Cause*
PostgresDataSourceFactory#getValidateDatabaseName uses TABLES.key() when
formatting the validation error message, so the message always prints tables
instead of the actual invalid table value.
*Proposed Fix*
Use the current table pattern value, trimmedTableName, in the error message
instead of TABLES.key().
--
This message was sent by Atlassian Jira
(v8.20.10#820010)