This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b171b057b71 [fix][test] Fix flaky PulsarDebeziumOracleSourceTest 
(#25314)
b171b057b71 is described below

commit b171b057b71c3ea42a96bebfa6d703db1b2c3b8f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 11 13:50:28 2026 -0700

    [fix][test] Fix flaky PulsarDebeziumOracleSourceTest (#25314)
---
 .../debezium/DebeziumOracleDbSourceTester.java     | 51 +++++++++++++++++-----
 1 file changed, 41 insertions(+), 10 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 09921d1ee1c..ab76ccefcb5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -146,18 +146,15 @@ public class DebeziumOracleDbSourceTester extends 
SourceTester<DebeziumOracleDbC
         // configure logminer
         runSqlCmd("shutdown immediate");
 
-        // good first approximation but still not enough:
         waitForOracleStatus("ORACLE not available");
         Thread.sleep(SLEEP_AFTER_COMMAND_MS);
 
-        runSqlCmd("startup mount");
-        // good first approximation but still not enough:
-        waitForOracleStatus("MOUNTED");
+        // startup mount may need retries if Oracle is still shutting down
+        retryCommand("startup mount", "MOUNTED");
         Thread.sleep(SLEEP_AFTER_COMMAND_MS);
 
         runSqlCmd("alter database archivelog;");
         runSqlCmd("alter database open;");
-        // good first approximation but still not enough:
         waitForOracleStatus("OPEN");
         Thread.sleep(SLEEP_AFTER_COMMAND_MS);
 
@@ -175,15 +172,49 @@ public class DebeziumOracleDbSourceTester extends 
SourceTester<DebeziumOracleDbC
     }
 
     private void waitForOracleStatus(String status) throws Exception {
+        log.info("Waiting for Oracle status '{}'", status);
+        String lastStdout = "";
+        String lastStderr = "";
         for (int i = 0; i < 1000; i++) {
-            ContainerExecResult response = runSqlCmd("SELECT INSTANCE_NAME, 
STATUS, DATABASE_STATUS FROM V$INSTANCE;");
-            if ((response.getStderr() != null && 
response.getStderr().contains(status))
-                    || (response.getStdout() != null && 
response.getStdout().contains(status))) {
-                return;
+            try {
+                ContainerExecResult response =
+                        runSqlCmd("SELECT INSTANCE_NAME, STATUS, 
DATABASE_STATUS FROM V$INSTANCE;");
+                lastStdout = response.getStdout() != null ? 
response.getStdout() : "";
+                lastStderr = response.getStderr() != null ? 
response.getStderr() : "";
+                if (lastStderr.contains(status) || 
lastStdout.contains(status)) {
+                    log.info("Oracle reached status '{}' after {} attempts", 
status, i + 1);
+                    return;
+                }
+            } catch (Exception e) {
+                lastStderr = e.getMessage();
+                log.debug("Error polling Oracle status (attempt {}): {}", i + 
1, e.getMessage());
+            }
+            if (i % 30 == 29) {
+                log.info("Still waiting for Oracle status '{}' after {} 
attempts. "
+                        + "Last stdout: {}, Last stderr: {}", status, i + 1, 
lastStdout, lastStderr);
             }
             Thread.sleep(1000);
         }
-        throw new IllegalStateException("Oracle did not initialize properly");
+        throw new IllegalStateException(
+                String.format("Oracle did not reach status '%s'. Last stdout: 
%s, Last stderr: %s",
+                        status, lastStdout, lastStderr));
+    }
+
+    private void retryCommand(String cmd, String expectedStatus) throws 
Exception {
+        for (int attempt = 1; attempt <= 3; attempt++) {
+            runSqlCmd(cmd);
+            try {
+                waitForOracleStatus(expectedStatus);
+                return;
+            } catch (IllegalStateException e) {
+                if (attempt == 3) {
+                    throw e;
+                }
+                log.warn("Command '{}' did not lead to status '{}' (attempt 
{}), retrying...",
+                        cmd, expectedStatus, attempt);
+                Thread.sleep(SLEEP_AFTER_COMMAND_MS);
+            }
+        }
     }
 
     private ContainerExecResult runSqlCmd(String cmd) throws Exception {

Reply via email to