This is an automated email from the ASF dual-hosted git repository. xiaozhenliu pushed a commit to branch xiaozhen-tentatively-fix-e2e-tests in repository https://gitbox.apache.org/repos/asf/texera.git
commit 84e02530c77d70a27f7a5c5a97ac2ed7ad8c0d40 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Oct 20 16:15:21 2025 -0700 fix(amber): add timeouts and retries to e2e tests --- .../amber/engine/e2e/DataProcessingSpec.scala | 59 +++++++++++++++++----- .../org/apache/amber/engine/e2e/PauseSpec.scala | 57 ++++++++++++++++----- 2 files changed, 91 insertions(+), 25 deletions(-) diff --git a/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala b/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala index cea348764a..50ac82add6 100644 --- a/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala +++ b/amber/src/test/scala/org/apache/amber/engine/e2e/DataProcessingSpec.scala @@ -126,9 +126,42 @@ class DataProcessingSpec completion.setDone() } }) - Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) - Await.result(completion, Duration.fromMinutes(1)) - results + try { + Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) + Await.result(completion, Duration.fromMinutes(1)) + results + } finally { + client.shutdown() + } + } + + /** + * In the CI environment, there is a chance that executeWorkflow does not receive "COMPLETED" status. + * Until we find the root cause of this issue, we use a retry mechanism here to stablize CI runs. + */ + def executeWorkflowWithRetries( + workflow: Workflow, + maxRetries: Int = 3 + ): Map[OperatorIdentity, List[Tuple]] = { + var attempt = 0 + while (attempt <= maxRetries) { + try { + return executeWorkflow(workflow) + } catch { + case _: com.twitter.util.TimeoutException => + attempt += 1 + if (attempt > maxRetries) { + throw new com.twitter.util.TimeoutException("executeWorkflow timed out after retries") + } + // Need to reset test texera_db state before retry + cleanupWorkflowExecutionData() + setUpWorkflowExecutionData() + case otherError: Throwable => + throw otherError + } + } + // Code should not reach here. + throw new RuntimeException("unreachable") } "Engine" should "execute headerlessCsv workflow normally" in { @@ -138,7 +171,7 @@ class DataProcessingSpec List(), workflowContext ) - val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) + val results = executeWorkflowWithRetries(workflow)(headerlessCsvOpDesc.operatorIdentifier) assert(results.size == 100) } @@ -150,7 +183,7 @@ class DataProcessingSpec List(), workflowContext ) - val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) + val results = executeWorkflowWithRetries(workflow)(headerlessCsvOpDesc.operatorIdentifier) assert(results.size == 100) } @@ -162,7 +195,7 @@ class DataProcessingSpec List(), workflowContext ) - val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) + val results = executeWorkflowWithRetries(workflow)(jsonlOp.operatorIdentifier) assert(results.size == 100) @@ -185,7 +218,7 @@ class DataProcessingSpec List(), workflowContext ) - val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) + val results = executeWorkflowWithRetries(workflow)(jsonlOp.operatorIdentifier) assert(results.size == 1000) @@ -216,7 +249,7 @@ class DataProcessingSpec ), workflowContext ) - executeWorkflow(workflow) + executeWorkflowWithRetries(workflow) } "Engine" should "execute csv workflow normally" in { @@ -226,7 +259,7 @@ class DataProcessingSpec List(), workflowContext ) - executeWorkflow(workflow) + executeWorkflowWithRetries(workflow) } "Engine" should "execute csv->keyword workflow normally" in { @@ -244,7 +277,7 @@ class DataProcessingSpec ), workflowContext ) - executeWorkflow(workflow) + executeWorkflowWithRetries(workflow) } "Engine" should "execute csv->keyword->count workflow normally" in { @@ -270,7 +303,7 @@ class DataProcessingSpec ), workflowContext ) - executeWorkflow(workflow) + executeWorkflowWithRetries(workflow) } "Engine" should "execute csv->keyword->averageAndGroupBy workflow normally" in { @@ -300,7 +333,7 @@ class DataProcessingSpec ), workflowContext ) - executeWorkflow(workflow) + executeWorkflowWithRetries(workflow) } "Engine" should "execute csv->(csv->)->join workflow normally" in { @@ -329,6 +362,6 @@ class DataProcessingSpec ), workflowContext ) - executeWorkflow(workflow) + executeWorkflowWithRetries(workflow) } } diff --git a/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala b/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala index c905ef7e68..9aedc272d9 100644 --- a/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala +++ b/amber/src/test/scala/org/apache/amber/engine/e2e/PauseSpec.scala @@ -22,7 +22,7 @@ package org.apache.amber.engine.e2e import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout -import com.twitter.util.{Await, Promise} +import com.twitter.util.{Await, Duration, Promise} import com.typesafe.scalalogging.Logger import org.apache.amber.clustering.SingleNodeListener import org.apache.amber.core.workflow.{PortIdentity, WorkflowContext} @@ -95,21 +95,54 @@ class PauseSpec completion.setDone() } }) - Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) - Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) - Thread.sleep(4000) - Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) - Thread.sleep(400) - Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) - Thread.sleep(4000) - Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) - Await.result(completion) + try { + Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) + Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) + Thread.sleep(4000) + Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) + Thread.sleep(400) + Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) + Thread.sleep(4000) + Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) + Await.result(completion, Duration.fromMinutes(1)) + } finally { + client.shutdown() + } + } + + /** + * In the CI environment, there is a chance that shouldPause does not receive "COMPLETED" status. + * Until we find the root cause of this issue, we use a retry mechanism here to stablize CI runs. + */ + def shouldPauseWithRetries( + operators: List[LogicalOp], + links: List[LogicalLink], + maxRetries: Int = 3 + ): Unit = { + var attempt = 0 + while (attempt <= maxRetries) { + try { + shouldPause(operators, links) + return + } catch { + case _: com.twitter.util.TimeoutException => + attempt += 1 + if (attempt > maxRetries) { + throw new com.twitter.util.TimeoutException("shouldPause timed out after retries") + } + // Need to reset test texera_db state before retry + cleanupWorkflowExecutionData() + setUpWorkflowExecutionData() + case otherError: Throwable => + throw otherError + } + } } "Engine" should "be able to pause csv workflow" in { val csvOpDesc = TestOperators.mediumCsvScanOpDesc() logger.info(s"csv-id ${csvOpDesc.operatorIdentifier}") - shouldPause( + shouldPauseWithRetries( List(csvOpDesc), List() ) @@ -121,7 +154,7 @@ class PauseSpec logger.info( s"csv-id ${csvOpDesc.operatorIdentifier}, keyword-id ${keywordOpDesc.operatorIdentifier}" ) - shouldPause( + shouldPauseWithRetries( List(csvOpDesc, keywordOpDesc), List( LogicalLink(
