This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new d8338d1241 update
d8338d1241 is described below

commit d8338d124197c1343ecb7b03cb9adf4e307effe3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 11 23:08:34 2026 -0800

    update
---
 .../messaginglayer/AmberFIFOChannel.scala           | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
index 7917721c9c..2556c55a36 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala
@@ -41,18 +41,17 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) 
extends AmberLogging {
   private var portId: Option[PortIdentity] = None
 
   def acceptMessage(msg: WorkflowFIFOMessage): Unit = {
-    //val seq = msg.sequenceNumber
-    //val payload = msg.payload
-    //if (isDuplicated(seq)) {
-    //  logger.debug(
-    //    s"received duplicated message $payload with seq = $seq while current 
seq = $current"
-    //  )
-    //} else if (isAhead(seq)) {
-    //  logger.debug(s"received ahead message $payload with seq = $seq while 
current seq = $current")
-    //  stash(seq, msg)
-    //} else {
+    //channel remove
+    val seq = msg.sequenceNumber
+    val payload = msg.payload
+    if (isDuplicated(seq)) {
+      logger.debug(s"received duplicated message $payload with seq = $seq 
while current seq = $current")
+    } else if (isAhead(seq)) {
+      logger.debug(s"received ahead message $payload with seq = $seq while 
current seq = $current")
+      stash(seq, msg)
+    } else {
       enforceFIFO(msg)
-    //}
+    }
   }
 
   def getCurrentSeq: Long = current

Reply via email to