This is an automated email from the ASF dual-hosted git repository.
shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/shengquan-add-reconfigration
by this push:
new 94a302e38a make java end working
94a302e38a is described below
commit 94a302e38a5d9b80f4291cae2417ca7bdf1b5827
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Feb 14 21:23:48 2026 -0800
make java end working
---
.../handlers/control/update_executor_handler.py | 2 +-
.../main/python/core/runnables/network_receiver.py | 1 +
.../DataProcessorRPCHandlerInitializer.scala | 2 +-
.../InitializeExecutorHandler.scala | 2 +-
.../promisehandlers/UpdateExecutorHandler.scala | 2 +-
.../texera/amber/engine/e2e/ModifyLogicSpec.scala | 22 +++++++++++++++++++---
6 files changed, 24 insertions(+), 7 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
index a5bc1a6c0f..7449cbb4b5 100644
---
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
+++
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
@@ -1 +1 @@
-# Licensed to the Apache Software Foundation (ASF) under one # or more
contributor license agreements. See the NOTICE file # distributed with this
work for additional information # regarding copyright ownership. The ASF
licenses this file # to you under the Apache License, Version 2.0 (the #
"License"); you may not use this file except in compliance # with the License.
You may obtain a copy of the License at # #
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable
law or agreed to in writing, # software distributed under the License is
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY #
KIND, either express or implied. See the License for the # specific language
governing permissions and limitations # under the License. from
core.architecture.handlers.control.control_handler_base import ControlHandler
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EmptyReturn, UpdateExecutorRequest, ) class Update
ExecutorHandler(ControlHandler): async def update_executor(self, req:
UpdateExecutorRequest) -> EmptyReturn:
self.context.executor_manager.update_executor(req.code, req.is_source)
return EmptyReturn()
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one # or more
contributor license agreements. See the NOTICE file # distributed with this
work for additional information # regarding copyright ownership. The ASF
licenses this file # to you under the Apache License, Version 2.0 (the #
"License"); you may not use this file except in compliance # with the License.
You may obtain a copy of the License at # #
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable
law or agreed to in writing, # software distributed under the License is
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY #
KIND, either express or implied. See the License for the # specific language
governing permissions and limitations # under the License. from
core.architecture.handlers.control.control_handler_base import ControlHandler
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EmptyReturn, UpdateExecutorRequest, ) from loguru im
port logger class UpdateExecutorHandler(ControlHandler): async def
update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:
self.context.executor_manager.update_executor(req.code, req.is_source)
return EmptyReturn()
\ No newline at end of file
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index fd42a8f589..879b623b76 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -126,6 +126,7 @@ class NetworkReceiver(Runnable, Stoppable):
payload=python_control_message.payload,
)
)
+ logger.info(python_control_message.payload)
return shared_queue.in_mem_size()
self._proxy_server.register_control_handler(control_handler)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
index d2837860f5..cfbc40b587 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
@@ -72,7 +72,7 @@ class DataProcessorRPCHandlerInitializer(val dp:
DataProcessor)
override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext):
Future[EmptyReturn] = ???
- def initializeExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int,
workerCount: Int): Unit = {
+ def setupExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int, workerCount:
Int): Unit = {
dp.executor = execInitInfo match {
case OpExecWithClassName(className, descString) =>
ExecFactory.newExecFromJavaClassName(className, descString, workerIdx,
workerCount)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index f6e7f3d8b2..a7fd26c434 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -42,7 +42,7 @@ trait InitializeExecutorHandler {
dp.serializationManager.setOpInitialization(req)
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
cachedTotalWorkerCount = req.totalWorkerCount
- initializeExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
+ setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
EmptyReturn()
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index 8cb0e30be2..5c2f65c257 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -35,7 +35,7 @@ trait UpdateExecutorHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
- initializeExecutor(request.newExecInitInfo, workerIdx,
cachedTotalWorkerCount)
+ setupExecutor(request.newExecInitInfo, workerIdx, cachedTotalWorkerCount)
dp.executor.open()
EmptyReturn()
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
index 2572a78137..699329bfae 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
@@ -25,7 +25,7 @@ import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
+import org.apache.texera.amber.core.executor.{OpExecInitInfo,
OpExecWithClassName, OpExecWithCode}
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
@@ -38,7 +38,7 @@ import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.common.client.AmberClient
import
org.apache.texera.amber.engine.e2e.TestUtils.{cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases, setUpWorkflowExecutionData}
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
-import org.apache.texera.amber.operator.TestOperators.{pythonOpDesc,
pythonSourceOpDesc}
+import org.apache.texera.amber.operator.TestOperators.{mediumCsvScanOpDesc,
pythonOpDesc, pythonSourceOpDesc}
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import org.apache.texera.workflow.LogicalLink
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
@@ -64,6 +64,7 @@ class ModifyLogicSpec extends
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
implicit val timeout: Timeout = Timeout(5.seconds)
val logger = Logger("ModifyLogicSpecLogger")
+ val ctx = new WorkflowContext()
override protected def beforeEach(): Unit = {
setUpWorkflowExecutionData()
@@ -94,7 +95,7 @@ class ModifyLogicSpec extends
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
checkResultLambda: (Map[OperatorIdentity,
List[Tuple]]) => Boolean
): Unit = {
val workflow =
- TestUtils.buildWorkflow(operators, links, new WorkflowContext())
+ TestUtils.buildWorkflow(operators, links, ctx)
val client =
new AmberClient(
system,
@@ -177,4 +178,19 @@ class ModifyLogicSpec extends
TestKit(ActorSystem("ModifyLogicSpec", AmberRuntim
)
}
+ "Engine" should "be able to modify a java operator in workflow" in {
+ val sourceOpDesc = mediumCsvScanOpDesc()
+ val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region",
"ShouldMatchNone")
+ val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region",
"Asia")
+ shouldReconfigure(List(sourceOpDesc, keywordMatchNoneOpDesc),
List(LogicalLink(
+ sourceOpDesc.operatorIdentifier,
+ PortIdentity(),
+ keywordMatchNoneOpDesc.operatorIdentifier,
+ PortIdentity()
+ )),
+ keywordMatchNoneOpDesc,
keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId,
ctx.executionId).opExecInitInfo,
+ results => results(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty
+ )
+ }
+
}