github-actions[bot] commented on code in PR #64728:
URL: https://github.com/apache/doris/pull/64728#discussion_r3471537309


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -85,12 +85,19 @@ public static Env getCurrentEnv() {
     }
 
     public SourceReader getReader(JobBaseConfig jobConfig) {
+        return getReader(jobConfig, false);

Review Comment:
   This default leaves an old-FE/new-cdc_client rolling-upgrade path without PG 
resources. In the base FE, initial/snapshot `initOnCreate()` did not call 
`/api/initReader`; it only set split progress, so the first CDC call is 
`/api/fetchSplits`, which still uses `Env.getReader(ftsReq)`. With this PR that 
default now creates a `JobContext` with `createResources=false`, and 
`PostgresSourceReader.initialize()` no longer creates the Doris-owned 
slot/publication. Because `getOrCreateReader()` returns an existing context 
before the `createResources` branch, a later init call cannot repair it either. 
The job can finish snapshots and then hit `validateStreamSource()` with 
`slotState == null`, pausing as "Replication slot invalidated" during rolling 
upgrade. Please either track `resourcesCreated` in `JobContext` so 
`getReader(..., true)` can provision an existing reader, or make the 
`/api/fetchSplits` first-open path provision resources for initial/snapshot PG 
jobs while still avoiding recrea
 tion on rebuild.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy:
##########
@@ -170,5 +170,131 @@ 
suite("test_streaming_postgres_job_slot_dropped_during_incremental",
             sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
         }
         sql """drop table if exists ${currentDb}.${table1} force"""
+
+        // ===== Second scenario: same drop, but a Doris-owned slot (no 
slot_name given) =====
+        // This is the case the fix actually guards: before it, the reader 
rebuilt after the drop
+        // would silently recreate the Doris-owned slot at the latest LSN and 
resume RUNNING (data
+        // loss). Now slot/publication are provisioned only at CREATE, so the 
rebuild finds the slot
+        // gone and fails non-resumably.
+        def dorisJob = "test_streaming_pg_slot_dropped_doris_owned_job"
+        def dorisTable = "slot_dropped_doris_pg_tbl"
+        sql """DROP JOB IF EXISTS where jobname = '${dorisJob}'"""
+        sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${dorisTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${dorisTable} (
+                  "id" int PRIMARY KEY,
+                  "name" varchar(200)
+                )"""
+        }
+
+        // No slot_name/publication_name: Doris owns and auto-creates them at 
CREATE (initReader).
+        sql """CREATE JOB ${dorisJob}
+                PROPERTIES ("max_interval" = "3")
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${dorisTable}",
+                    "offset" = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+            def st = sql """select status from jobs("type"="insert") where 
Name='${dorisJob}'"""
+            st.size() == 1 && st.get(0).get(0) == "RUNNING"
+        })
+
+        // Phase 1: confirm steady-state incremental sync
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            for (int i = 1; i <= 10; i++) {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${dorisTable} VALUES 
(${i}, 'name_${i}')"""
+            }
+        }
+        Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+            def cnt = sql """SELECT count(*) FROM ${currentDb}.${dorisTable}"""
+            cnt.size() == 1 && cnt.get(0).get(0) >= 10
+        })
+
+        // Discover the Doris-owned slot name (doris_cdc_<jobId>) created at 
CREATE.
+        def dorisSlot = null
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            def slots = sql """SELECT slot_name FROM pg_replication_slots 
WHERE slot_name LIKE 'doris_cdc_%'"""

Review Comment:
   This can drop the wrong slot. The slot created for this job is deterministic 
(`doris_cdc_<jobId>`), but this query returns every Doris-owned slot in the 
shared PG instance and picks `slots[0]` without `ORDER BY`. A leftover slot 
from a failed external test or a concurrent CDC suite can be selected, so the 
test either deletes another job's slot or later asserts against a slot that is 
not `dorisJob`'s, and it no longer proves the intended regression. Please query 
this job's `Id` from `jobs("type"="insert")` and use `doris_cdc_${jobId}` as 
the slot name, then drop/assert that exact slot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to