This is an automated email from the ASF dual-hosted git repository.
shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/shengquan-add-reconfigration
by this push:
new 7d0342ef70 fix python end
7d0342ef70 is described below
commit 7d0342ef70418fe36bae11510dcf07afc1d2b70f
Author: Shengquan Ni <[email protected]>
AuthorDate: Sun Feb 15 01:37:46 2026 -0800
fix python end
---
.../handlers/control/update_executor_handler.py | 2 +-
.../rpc/async_rpc_handler_initializer.py | 2 +
.../src/main/python/core/models/internal_queue.py | 3 +
.../main/python/core/runnables/network_receiver.py | 1 -
.../promisehandlers/ReconfigurationHandler.scala | 9 +-
.../texera/amber/engine/e2e/ModifyLogicSpec.scala | 130 ++++++++++++++-------
.../texera/amber/operator/TestOperators.scala | 5 +-
7 files changed, 104 insertions(+), 48 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
index 7449cbb4b5..a79d31c180 100644
---
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
+++
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
@@ -1 +1 @@
-# Licensed to the Apache Software Foundation (ASF) under one # or more
contributor license agreements. See the NOTICE file # distributed with this
work for additional information # regarding copyright ownership. The ASF
licenses this file # to you under the Apache License, Version 2.0 (the #
"License"); you may not use this file except in compliance # with the License.
You may obtain a copy of the License at # #
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable
law or agreed to in writing, # software distributed under the License is
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY #
KIND, either express or implied. See the License for the # specific language
governing permissions and limitations # under the License. from
core.architecture.handlers.control.control_handler_base import ControlHandler
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EmptyReturn, UpdateExecutorRequest, ) from loguru im
port logger class UpdateExecutorHandler(ControlHandler): async def
update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:
self.context.executor_manager.update_executor(req.code, req.is_source)
return EmptyReturn()
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one # or more
contributor license agreements. See the NOTICE file # distributed with this
work for additional information # regarding copyright ownership. The ASF
licenses this file # to you under the Apache License, Version 2.0 (the #
"License"); you may not use this file except in compliance # with the License.
You may obtain a copy of the License at # #
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable
law or agreed to in writing, # software distributed under the License is
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY #
KIND, either express or implied. See the License for the # specific language
governing permissions and limitations # under the License. from
core.architecture.handlers.control.control_handler_base import ControlHandler
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EmptyReturn, UpdateExecutorRequest, ) from core.util
import get_one_of from proto.org.apache.texera.amber.core import
OpExecWithCode class UpdateExecutorHandler(ControlHandler): async def
update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:
op_exec_with_code: OpExecWithCode = get_one_of(req.new_exec_init_info)
self.context.executor_manager.update_executor(op_exec_with_code.code, False)
return EmptyReturn()
\ No newline at end of file
diff --git
a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
index c2574028a1..7b60bc27fb 100644
---
a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
+++
b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
@@ -45,6 +45,7 @@ from
core.architecture.handlers.control.replay_current_tuple_handler import (
from core.architecture.handlers.control.resume_worker_handler import
ResumeWorkerHandler
from core.architecture.handlers.control.start_channel_handler import
StartChannelHandler
from core.architecture.handlers.control.start_worker_handler import
StartWorkerHandler
+from core.architecture.handlers.control.update_executor_handler import
UpdateExecutorHandler
class AsyncRPCHandlerInitializer(
@@ -64,5 +65,6 @@ class AsyncRPCHandlerInitializer(
StartChannelHandler,
EndChannelHandler,
NoOperationHandler,
+ UpdateExecutorHandler,
):
pass
diff --git a/amber/src/main/python/core/models/internal_queue.py
b/amber/src/main/python/core/models/internal_queue.py
index abc1793ff6..ae0247aa2d 100644
--- a/amber/src/main/python/core/models/internal_queue.py
+++ b/amber/src/main/python/core/models/internal_queue.py
@@ -79,6 +79,9 @@ class InternalQueue(IQueue):
if item.tag not in self._queue_ids:
self._queue.add_sub_queue(item.tag, 1 if item.tag.is_control
else 2)
self._queue_ids.add(item.tag)
+ if len(self._queue_state) > 0 and not item.tag.is_control:
+ # if paused, then the newly added queue will also be
paused.
+ self._queue.disable(item.tag)
if isinstance(item, (DataElement, InternalMarker, ECMElement)):
self._queue.put(item.tag, item)
elif isinstance(item, DCMElement):
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index 879b623b76..fd42a8f589 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -126,7 +126,6 @@ class NetworkReceiver(Runnable, Stoppable):
payload=python_control_message.payload,
)
)
- logger.info(python_control_message.payload)
return shared_queue.in_mem_size()
self._proxy_server.register_control_handler(control_handler)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 573306bc06..dd31aae420 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -37,7 +37,14 @@ trait ReconfigurationHandler {
override def reconfigureWorkflow(msg: WorkflowReconfigureRequest, ctx:
AsyncRPCContext): Future[EmptyReturn] = {
val futures = mutable.ArrayBuffer[Future[_]]()
-
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
msg).foreach{
+ val friesComponents =
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
msg)
+ val needToSendECMToSources = friesComponents.exists(comp =>
comp.sources.exists(sourceOp =>
cp.workflowScheduler.physicalPlan.getOperator(sourceOp).isSourceOperator))
+ if(needToSendECMToSources){
+ throw new IllegalStateException(
+ "Reconfiguration cannot be propagated through source operators"
+ )
+ }
+ friesComponents.foreach{
friesComponent =>
if(friesComponent.scope.size == 1){
val updateExecutorRequest = friesComponent.reconfigurations.head
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
index 699329bfae..41016a8144 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
@@ -90,10 +90,9 @@ class ModifyLogicSpec extends
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
def shouldReconfigure(
operators: List[LogicalOp],
links: List[LogicalLink],
- targetOp: LogicalOp,
- newOpExecInitInfo: OpExecInitInfo,
- checkResultLambda: (Map[OperatorIdentity,
List[Tuple]]) => Boolean
- ): Unit = {
+ targetOps: Seq[LogicalOp],
+ newOpExecInitInfo: OpExecInitInfo
+ ): Map[OperatorIdentity, List[Tuple]] = {
val workflow =
TestUtils.buildWorkflow(operators, links, ctx)
val client =
@@ -105,43 +104,42 @@ class ModifyLogicSpec extends
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
error => {}
)
val completion = Promise[Unit]()
+ var result: Map[OperatorIdentity, List[Tuple]] = null
client
.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) {
-// checkResultLambda(workflow.logicalPlan.getTerminalOperatorIds
-// .filter(terminalOpId => {
-// val uri = getResultUriByLogicalPortId(
-// workflow.context.executionId,
-// terminalOpId,
-// PortIdentity()
-// )
-// uri.nonEmpty
-// })
-// .map(terminalOpId => {
-// //TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
-// Thread.sleep(1000)
-// val uri = getResultUriByLogicalPortId(
-// workflow.context.executionId,
-// terminalOpId,
-// PortIdentity()
-// ).get
-// terminalOpId -> DocumentFactory
-// .openDocument(uri)
-// ._1
-// .asInstanceOf[VirtualDocument[Tuple]]
-// .get()
-// .toList
-// })
-// .toMap)
+ result = workflow.logicalPlan.getTerminalOperatorIds
+ .filter(terminalOpId => {
+ val uri = getResultUriByLogicalPortId(
+ workflow.context.executionId,
+ terminalOpId,
+ PortIdentity()
+ )
+ uri.nonEmpty
+ })
+ .map(terminalOpId => {
+ //TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
+ Thread.sleep(1000)
+ val uri = getResultUriByLogicalPortId(
+ workflow.context.executionId,
+ terminalOpId,
+ PortIdentity()
+ ).get
+ terminalOpId -> DocumentFactory
+ .openDocument(uri)
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ .get()
+ .toList
+ })
+ .toMap
completion.setDone()
}
})
Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
Thread.sleep(4000)
- val physicalOps =
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(targetOp.operatorIdentifier)
- assert(physicalOps.nonEmpty && physicalOps.length == 1,
- "cannot reconfigure more than one physical operator in this test")
+ val physicalOps = targetOps.flatMap(op =>
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier))
Await.result(client.controllerInterface.reconfigureWorkflow(WorkflowReconfigureRequest(
reconfiguration =
physicalOps.map(op => UpdateExecutorRequest(op.id, newOpExecInitInfo)
@@ -149,48 +147,94 @@ class ModifyLogicSpec extends
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
Thread.sleep(400)
Await.result(completion, Duration.fromMinutes(1))
+ result
}
"Engine" should "be able to modify a python UDF worker in workflow" in {
+ val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
val udfOpDesc = pythonOpDesc()
- val sourceOpDesc = pythonSourceOpDesc(5000)
val code = """
|from pytexera import *
|
|class ProcessTupleOperator(UDFOperatorV2):
| @overrides
| def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
- | tuple_['field_2'] = tuple_['field_2'] +
'_reconfigured'
+ | tuple_['Region'] = tuple_['Region'] + '_reconfigured'
| yield tuple_
|""".stripMargin
- shouldReconfigure(List(sourceOpDesc, udfOpDesc), List(LogicalLink(
+ val result = shouldReconfigure(List(sourceOpDesc, udfOpDesc),
List(LogicalLink(
sourceOpDesc.operatorIdentifier,
PortIdentity(),
udfOpDesc.operatorIdentifier,
PortIdentity()
)),
- udfOpDesc, OpExecWithCode(code, "python"),
- results => results(udfOpDesc.operatorIdentifier).exists {
- t =>
t.getField("field_2").asInstanceOf[String].contains("_reconfigured")
- }
- )
+ Seq(udfOpDesc), OpExecWithCode(code, "python"))
+ assert(result(udfOpDesc.operatorIdentifier).exists {
+ t => t.getField("Region").asInstanceOf[String].contains("_reconfigured")
+ })
}
"Engine" should "be able to modify a java operator in workflow" in {
val sourceOpDesc = mediumCsvScanOpDesc()
val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region",
"ShouldMatchNone")
val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region",
"Asia")
- shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc),
List(LogicalLink(
+ val result = shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc),
List(LogicalLink(
sourceOpDesc.operatorIdentifier,
PortIdentity(),
keywordMatchNoneOpDesc.operatorIdentifier,
PortIdentity()
)),
- keywordMatchNoneOpDesc,
keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId,
ctx.executionId).opExecInitInfo,
- results => results(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty
+ Seq(keywordMatchNoneOpDesc),
keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId,
ctx.executionId).opExecInitInfo
)
+ assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty)
+ }
+
+ "Engine" should "not be able to modify a source operator in workflow" in {
+ val sourceOpDesc = mediumCsvScanOpDesc()
+ val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region",
"ShouldMatchNone")
+ val ex = intercept[Throwable] {
+ shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc),
List(LogicalLink(
+ sourceOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordMatchNoneOpDesc.operatorIdentifier,
+ PortIdentity()
+ )),
+ Seq(sourceOpDesc), sourceOpDesc.getPhysicalOp(ctx.workflowId,
ctx.executionId).opExecInitInfo
+ )}
+ assert(ex.getMessage == "java.lang.IllegalStateException: Reconfiguration
cannot be propagated through source operators")
+ }
+
+ "Engine" should "be able to modify two python UDFs in workflow" in {
+ val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
+ val udfOpDesc1 = pythonOpDesc()
+ val udfOpDesc2 = pythonOpDesc()
+ val code = """
+ |from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | @overrides
+ | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ | tuple_['Region'] = tuple_['Region'] + '_reconfigured'
+ | yield tuple_
+ |""".stripMargin
+
+ val result = shouldReconfigure(List(sourceOpDesc, udfOpDesc1, udfOpDesc2),
List(LogicalLink(
+ sourceOpDesc.operatorIdentifier,
+ PortIdentity(),
+ udfOpDesc1.operatorIdentifier,
+ PortIdentity()
+ ), LogicalLink(
+ udfOpDesc1.operatorIdentifier,
+ PortIdentity(),
+ udfOpDesc2.operatorIdentifier,
+ PortIdentity()
+ )),
+ Seq(udfOpDesc1, udfOpDesc2), OpExecWithCode(code, "python"))
+ assert(result(udfOpDesc2.operatorIdentifier).exists {
+ t =>
t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured")
+ })
}
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
index 0f6fcd669a..0dedd09e1f 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
@@ -182,18 +182,19 @@ object TestOperators {
udf
}
- def pythonSourceOpDesc(num_tuple: Int): PythonUDFSourceOpDescV2 = {
+ def pythonSourceOpDesc(num_tuple: Int, delay_in_sec: Double):
PythonUDFSourceOpDescV2 = {
val udf = new PythonUDFSourceOpDescV2()
udf.workers = 1
udf.columns = List(new Attribute("field_1", AttributeType.INTEGER), new
Attribute("field_2", AttributeType.STRING))
udf.code =
s"""
|from pytexera import *
- |
+ |from time import sleep
|class ProcessTupleOperator(UDFSourceOperator):
| @overrides
| def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
| for i in range($num_tuple):
+ | sleep($delay_in_sec)
| yield {'field_1': i, 'field_2': str(i)}
|""".stripMargin
udf