This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b171b057b71 [fix][test] Fix flaky PulsarDebeziumOracleSourceTest
(#25314)
b171b057b71 is described below
commit b171b057b71c3ea42a96bebfa6d703db1b2c3b8f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 11 13:50:28 2026 -0700
[fix][test] Fix flaky PulsarDebeziumOracleSourceTest (#25314)
---
.../debezium/DebeziumOracleDbSourceTester.java | 51 +++++++++++++++++-----
1 file changed, 41 insertions(+), 10 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 09921d1ee1c..ab76ccefcb5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -146,18 +146,15 @@ public class DebeziumOracleDbSourceTester extends
SourceTester<DebeziumOracleDbC
// configure logminer
runSqlCmd("shutdown immediate");
- // good first approximation but still not enough:
waitForOracleStatus("ORACLE not available");
Thread.sleep(SLEEP_AFTER_COMMAND_MS);
- runSqlCmd("startup mount");
- // good first approximation but still not enough:
- waitForOracleStatus("MOUNTED");
+ // startup mount may need retries if Oracle is still shutting down
+ retryCommand("startup mount", "MOUNTED");
Thread.sleep(SLEEP_AFTER_COMMAND_MS);
runSqlCmd("alter database archivelog;");
runSqlCmd("alter database open;");
- // good first approximation but still not enough:
waitForOracleStatus("OPEN");
Thread.sleep(SLEEP_AFTER_COMMAND_MS);
@@ -175,15 +172,49 @@ public class DebeziumOracleDbSourceTester extends
SourceTester<DebeziumOracleDbC
}
private void waitForOracleStatus(String status) throws Exception {
+ log.info("Waiting for Oracle status '{}'", status);
+ String lastStdout = "";
+ String lastStderr = "";
for (int i = 0; i < 1000; i++) {
- ContainerExecResult response = runSqlCmd("SELECT INSTANCE_NAME,
STATUS, DATABASE_STATUS FROM V$INSTANCE;");
- if ((response.getStderr() != null &&
response.getStderr().contains(status))
- || (response.getStdout() != null &&
response.getStdout().contains(status))) {
- return;
+ try {
+ ContainerExecResult response =
+ runSqlCmd("SELECT INSTANCE_NAME, STATUS,
DATABASE_STATUS FROM V$INSTANCE;");
+ lastStdout = response.getStdout() != null ?
response.getStdout() : "";
+ lastStderr = response.getStderr() != null ?
response.getStderr() : "";
+ if (lastStderr.contains(status) ||
lastStdout.contains(status)) {
+ log.info("Oracle reached status '{}' after {} attempts",
status, i + 1);
+ return;
+ }
+ } catch (Exception e) {
+ lastStderr = e.getMessage();
+ log.debug("Error polling Oracle status (attempt {}): {}", i +
1, e.getMessage());
+ }
+ if (i % 30 == 29) {
+ log.info("Still waiting for Oracle status '{}' after {}
attempts. "
+ + "Last stdout: {}, Last stderr: {}", status, i + 1,
lastStdout, lastStderr);
}
Thread.sleep(1000);
}
- throw new IllegalStateException("Oracle did not initialize properly");
+ throw new IllegalStateException(
+ String.format("Oracle did not reach status '%s'. Last stdout:
%s, Last stderr: %s",
+ status, lastStdout, lastStderr));
+ }
+
+ private void retryCommand(String cmd, String expectedStatus) throws
Exception {
+ for (int attempt = 1; attempt <= 3; attempt++) {
+ runSqlCmd(cmd);
+ try {
+ waitForOracleStatus(expectedStatus);
+ return;
+ } catch (IllegalStateException e) {
+ if (attempt == 3) {
+ throw e;
+ }
+ log.warn("Command '{}' did not lead to status '{}' (attempt
{}), retrying...",
+ cmd, expectedStatus, attempt);
+ Thread.sleep(SLEEP_AFTER_COMMAND_MS);
+ }
+ }
}
private ContainerExecResult runSqlCmd(String cmd) throws Exception {