This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch xiaozhen-tentatively-fix-e2e-tests
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 84e02530c77d70a27f7a5c5a97ac2ed7ad8c0d40
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Oct 20 16:15:21 2025 -0700

    fix(amber): add timeouts and retries to e2e tests
---
 .../amber/engine/e2e/DataProcessingSpec.scala      | 59 +++++++++++++++++-----
 .../org/apache/amber/engine/e2e/PauseSpec.scala    | 57 ++++++++++++++++-----
 2 files changed, 91 insertions(+), 25 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala 
b/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala
index cea348764a..50ac82add6 100644
--- a/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala
+++ b/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala
@@ -126,9 +126,42 @@ class DataProcessingSpec
           completion.setDone()
         }
       })
-    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
-    Await.result(completion, Duration.fromMinutes(1))
-    results
+    try {
+      Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), 
()))
+      Await.result(completion, Duration.fromMinutes(1))
+      results
+    } finally {
+      client.shutdown()
+    }
+  }
+
+  /**
+    * In the CI environment, there is a chance that executeWorkflow does not 
receive "COMPLETED" status.
+    * Until we find the root cause of this issue, we use a retry mechanism 
here to stablize CI runs.
+    */
+  def executeWorkflowWithRetries(
+      workflow: Workflow,
+      maxRetries: Int = 3
+  ): Map[OperatorIdentity, List[Tuple]] = {
+    var attempt = 0
+    while (attempt <= maxRetries) {
+      try {
+        return executeWorkflow(workflow)
+      } catch {
+        case _: com.twitter.util.TimeoutException =>
+          attempt += 1
+          if (attempt > maxRetries) {
+            throw new com.twitter.util.TimeoutException("executeWorkflow timed 
out after retries")
+          }
+          // Need to reset test texera_db state before retry
+          cleanupWorkflowExecutionData()
+          setUpWorkflowExecutionData()
+        case otherError: Throwable =>
+          throw otherError
+      }
+    }
+    // Code should not reach here.
+    throw new RuntimeException("unreachable")
   }
 
   "Engine" should "execute headerlessCsv workflow normally" in {
@@ -138,7 +171,7 @@ class DataProcessingSpec
       List(),
       workflowContext
     )
-    val results = 
executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier)
+    val results = 
executeWorkflowWithRetries(workflow)(headerlessCsvOpDesc.operatorIdentifier)
 
     assert(results.size == 100)
   }
@@ -150,7 +183,7 @@ class DataProcessingSpec
       List(),
       workflowContext
     )
-    val results = 
executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier)
+    val results = 
executeWorkflowWithRetries(workflow)(headerlessCsvOpDesc.operatorIdentifier)
 
     assert(results.size == 100)
   }
@@ -162,7 +195,7 @@ class DataProcessingSpec
       List(),
       workflowContext
     )
-    val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier)
+    val results = 
executeWorkflowWithRetries(workflow)(jsonlOp.operatorIdentifier)
 
     assert(results.size == 100)
 
@@ -185,7 +218,7 @@ class DataProcessingSpec
       List(),
       workflowContext
     )
-    val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier)
+    val results = 
executeWorkflowWithRetries(workflow)(jsonlOp.operatorIdentifier)
 
     assert(results.size == 1000)
 
@@ -216,7 +249,7 @@ class DataProcessingSpec
       ),
       workflowContext
     )
-    executeWorkflow(workflow)
+    executeWorkflowWithRetries(workflow)
   }
 
   "Engine" should "execute csv workflow normally" in {
@@ -226,7 +259,7 @@ class DataProcessingSpec
       List(),
       workflowContext
     )
-    executeWorkflow(workflow)
+    executeWorkflowWithRetries(workflow)
   }
 
   "Engine" should "execute csv->keyword workflow normally" in {
@@ -244,7 +277,7 @@ class DataProcessingSpec
       ),
       workflowContext
     )
-    executeWorkflow(workflow)
+    executeWorkflowWithRetries(workflow)
   }
 
   "Engine" should "execute csv->keyword->count workflow normally" in {
@@ -270,7 +303,7 @@ class DataProcessingSpec
       ),
       workflowContext
     )
-    executeWorkflow(workflow)
+    executeWorkflowWithRetries(workflow)
   }
 
   "Engine" should "execute csv->keyword->averageAndGroupBy workflow normally" 
in {
@@ -300,7 +333,7 @@ class DataProcessingSpec
       ),
       workflowContext
     )
-    executeWorkflow(workflow)
+    executeWorkflowWithRetries(workflow)
   }
 
   "Engine" should "execute csv->(csv->)->join workflow normally" in {
@@ -329,6 +362,6 @@ class DataProcessingSpec
       ),
       workflowContext
     )
-    executeWorkflow(workflow)
+    executeWorkflowWithRetries(workflow)
   }
 }
diff --git a/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala 
b/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala
index c905ef7e68..9aedc272d9 100644
--- a/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala
@@ -22,7 +22,7 @@ package org.apache.amber.engine.e2e
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
-import com.twitter.util.{Await, Promise}
+import com.twitter.util.{Await, Duration, Promise}
 import com.typesafe.scalalogging.Logger
 import org.apache.amber.clustering.SingleNodeListener
 import org.apache.amber.core.workflow.{PortIdentity, WorkflowContext}
@@ -95,21 +95,54 @@ class PauseSpec
           completion.setDone()
         }
       })
-    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
-    Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
-    Thread.sleep(4000)
-    Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
-    Thread.sleep(400)
-    Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
-    Thread.sleep(4000)
-    Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
-    Await.result(completion)
+    try {
+      Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), 
()))
+      Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), 
()))
+      Thread.sleep(4000)
+      Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), 
()))
+      Thread.sleep(400)
+      Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), 
()))
+      Thread.sleep(4000)
+      Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), 
()))
+      Await.result(completion, Duration.fromMinutes(1))
+    } finally {
+      client.shutdown()
+    }
+  }
+
+  /**
+    * In the CI environment, there is a chance that shouldPause does not 
receive "COMPLETED" status.
+    * Until we find the root cause of this issue, we use a retry mechanism 
here to stablize CI runs.
+    */
+  def shouldPauseWithRetries(
+      operators: List[LogicalOp],
+      links: List[LogicalLink],
+      maxRetries: Int = 3
+  ): Unit = {
+    var attempt = 0
+    while (attempt <= maxRetries) {
+      try {
+        shouldPause(operators, links)
+        return
+      } catch {
+        case _: com.twitter.util.TimeoutException =>
+          attempt += 1
+          if (attempt > maxRetries) {
+            throw new com.twitter.util.TimeoutException("shouldPause timed out 
after retries")
+          }
+          // Need to reset test texera_db state before retry
+          cleanupWorkflowExecutionData()
+          setUpWorkflowExecutionData()
+        case otherError: Throwable =>
+          throw otherError
+      }
+    }
   }
 
   "Engine" should "be able to pause csv workflow" in {
     val csvOpDesc = TestOperators.mediumCsvScanOpDesc()
     logger.info(s"csv-id ${csvOpDesc.operatorIdentifier}")
-    shouldPause(
+    shouldPauseWithRetries(
       List(csvOpDesc),
       List()
     )
@@ -121,7 +154,7 @@ class PauseSpec
     logger.info(
       s"csv-id ${csvOpDesc.operatorIdentifier}, keyword-id 
${keywordOpDesc.operatorIdentifier}"
     )
-    shouldPause(
+    shouldPauseWithRetries(
       List(csvOpDesc, keywordOpDesc),
       List(
         LogicalLink(

Reply via email to