Copilot commented on code in PR #64075:
URL: https://github.com/apache/doris/pull/64075#discussion_r3354055815


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -150,6 +157,43 @@ private void 
createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
         }
     }
 
+    /** Create/ensure the Doris-owned publication for all include_tables 
(idempotent, multi-BE safe). */
+    private void createPublicationForDorisOwned(
+            PostgresDialect dialect, Map<String, String> config, String jobId) 
{
+        String pubName = resolvePublicationName(config, jobId);
+        String schema = config.get(DataSourceConfigKeys.SCHEMA);
+        String[] qualified = ConfigUtil.getTableList(schema, config);
+        if (qualified.length == 0) {
+            throw new CdcClientException("No tables to create publication " + 
pubName);
+        }
+        String tableList =
+                Arrays.stream(qualified)
+                        .map(q -> new TableId(null, schema, 
q.substring(q.indexOf('.') + 1))
+                                .toDoubleQuotedString())
+                        .collect(Collectors.joining(", "));
+        // Mirrors debezium PostgresReplicationConnection#initPublication: 
check existence, then
+        // CREATE ... FOR TABLE / ALTER ... SET TABLE (here always the full 
include_tables set).
+        try (PostgresConnection conn = dialect.openJdbcConnection();
+                Statement stmt = conn.connection().createStatement()) {
+            long count;
+            try (ResultSet rs =
+                    stmt.executeQuery(
+                            "SELECT COUNT(1) FROM pg_publication WHERE pubname 
= '" + pubName + "'")) {
+                rs.next();
+                count = rs.getLong(1);
+            }
+            if (count == 0) {
+                stmt.execute("CREATE PUBLICATION " + pubName + " FOR TABLE " + 
tableList);
+            } else {
+                stmt.execute("ALTER PUBLICATION " + pubName + " SET TABLE " + 
tableList);
+            }

Review Comment:
   createPublicationForDorisOwned() is not actually “multi-BE safe”: the SELECT 
COUNT + CREATE/ALTER sequence has a race where two BEs can both observe 
count==0 and then one CREATE PUBLICATION fails with duplicate_object, aborting 
reader initialization. This can make multi-BE jobs flaky (exactly the scenario 
this PR is trying to stabilize). Prefer an atomic-ish approach: try CREATE 
first and, on duplicate_object, fall back to ALTER SET TABLE (or always ALTER 
after ensuring existence).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to