This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 41581e5700c [fix](streaming-job) drop neighbour-table rows leaked by 
JDBC LIKE wildcards in JdbcPostgreSQLClient (#63402)
41581e5700c is described below

commit 41581e5700c3c5eb51e40d1ce2b6d7f35f5db279
Author: wudi <[email protected]>
AuthorDate: Tue May 26 21:02:45 2026 +0800

    [fix](streaming-job) drop neighbour-table rows leaked by JDBC LIKE 
wildcards in JdbcPostgreSQLClient (#63402)
    
    ### What problem does this PR solve?
    
    `JdbcPostgreSQLClient.getJdbcColumnsInfo` calls
    `DatabaseMetaData.getColumns(catalog, schemaPattern, tableNamePattern,
    columnNamePattern)`. Per the JDBC spec the 3rd argument is a **SQL LIKE
    pattern**, so literal `_` / `%` characters in the requested table name
    are interpreted as wildcards by the Postgres driver. When a streaming
    job is created with `include_tables = "user_info_pg_normal1"` and a
    neighbour table like `userXinfo_pg_normal1` happens to coexist in the
    same schema, the metadata query returns columns from **both** tables.
    The combined result then trips `CREATE TABLE` on the Doris side with
    errors such as `errCode = 2, detailMessage = Duplicate column name
    'name'`, or pollutes the auto-created table schema with stray columns.
    
    The repro is trivial: in the same Postgres schema create
    
    - `user_info_pg_normal1(name varchar, age int2)` — the table we want to
    capture
    - `userXinfo_pg_normal1(name varchar, weight float8)` — a decoy whose
    name only differs from the target by a single character that `_` matches
    
    then run `CREATE JOB ... include_tables = "user_info_pg_normal1"`.
    Without the fix the schema fetched for the target leaks `weight` (or
    `Duplicate column name 'name'`, depending on column order).
    
    Fix: after fetching the `ResultSet`, drop rows whose `TABLE_NAME` does
    not exactly equal the requested `remoteTableName`. We deliberately do
    **not** escape `_` / `%` at the source — relying on
    `DatabaseMetaData.getSearchStringEscape()` is driver-version dependent
    (older Oracle drivers don't honour escape sequences in `getTables`),
    while filtering on the consumer side is deterministic and
    driver-agnostic.
    
    Scope:
    
    - Only `JdbcPostgreSQLClient` is patched. This is the path used by
    Postgres streaming jobs (the failing case). MySQL streaming jobs were
    checked against the same decoy pattern and do not reproduce the bug
    because MySQL Connector/J doesn't pull neighbour rows here in practice —
    so `JdbcMySQLClient` is left untouched in this PR.
    - The JDBC catalog path lives in a separate module
    (`fe-connector-jdbc/.../JdbcConnectorClient`) and is **not** part of
    this PR. It already does partial escape but intentionally skips `_` /
    `%` for driver-compatibility reasons; a follow-up can apply the same
    after-the-fact filter there.
---
 .../jdbc/client/JdbcPostgreSQLClient.java          |  5 ++++
 .../cdc/test_streaming_postgres_job.groovy         | 27 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
index 6ee651ad24e..46d94f90b4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
@@ -63,6 +63,11 @@ public class JdbcPostgreSQLClient extends JdbcClient {
             String catalogName = getCatalogName(conn);
             rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, 
remoteTableName);
             while (rs.next()) {
+                // getColumns treats schema/table as LIKE patterns; drop rows 
pulled in via `_`/`%`.
+                if (!remoteDbName.equals(rs.getString("TABLE_SCHEM"))
+                        || 
!remoteTableName.equals(rs.getString("TABLE_NAME"))) {
+                    continue;
+                }
                 int dataType = rs.getInt("DATA_TYPE");
                 int arrayDimensions = 0;
                 if (dataType == Types.ARRAY) {
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index 9aacdd22c98..b16e13a74cd 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -75,6 +75,29 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
             sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age) 
VALUES ('A2', 1);"""
             sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age) 
VALUES ('B2', 2);"""
 
+            // Decoy table whose name differs from table1 by a single char 
that JDBC LIKE
+            // matches via `_`. Different column shape (weight) so any schema 
leak surfaces
+            // as a Duplicate-column error on CREATE TABLE or a stray `weight` 
column.
+            sql """DROP TABLE IF EXISTS 
${pgDB}.${pgSchema}.userXinfo_pg_normal1"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.userXinfo_pg_normal1 (
+                  "name" varchar(200),
+                  "weight" float8,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.userXinfo_pg_normal1 (name, 
weight) VALUES ('decoy1', 1.5);"""
+
+            // Decoy schema whose name differs from pgSchema (cdc_test) by a 
single char that
+            // JDBC LIKE matches via `_`. A same-named table with a different 
column shape
+            // (height) lives inside, so any schema-level LIKE leak surfaces 
as a stray
+            // `height` column or a Duplicate-column error.
+            sql """DROP SCHEMA IF EXISTS cdcXtest CASCADE"""
+            sql """CREATE SCHEMA cdcXtest"""
+            sql """CREATE TABLE ${pgDB}.cdcXtest.${table1} (
+                  "name" varchar(200),
+                  "height" float8,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.cdcXtest.${table1} (name, height) 
VALUES ('schema_decoy', 9.9);"""
         }
 
         sql """CREATE JOB ${jobName}
@@ -108,6 +131,10 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         assert createTalInfo.contains("`age` smallint");
         assert createTalInfo.contains("UNIQUE KEY(`name`)");
         assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS 
AUTO");
+        // Guard: decoy table userXinfo_pg_normal1 must not leak its `weight` 
column.
+        assert !createTalInfo.contains("`weight`");
+        // Guard: decoy schema cdcXtest must not leak its `height` column 
either.
+        assert !createTalInfo.contains("`height`");
 
         // check job running
         try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to