This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch fix/graph-interpreter-memory-leak
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit aceedd3d5fad898355485919bdae3a3db4993a99
Author: He-Pin <[email protected]>
AuthorDate: Mon Jun 1 04:21:50 2026 +0800

    fix: release references to completed stage logics in GraphInterpreter
    
    Motivation:
    When multiple stages are fused, a single alive stage keeps the 
GraphInterpreter alive and with it references to all already completed 
GraphStage logics, which may keep a significant amount of memory.
    
    Modification:
    - Modified GraphInterpreter.finish() to release references to all stage 
logics after finalization
    - Modified GraphInterpreter.toSnapshot() to handle null logics gracefully
    - Added null checks in finish() to avoid NPE when logic is already null
    
    Result:
    GraphInterpreter no longer keeps references to completed stage logics, 
allowing them to be garbage collected even if the interpreter is still alive 
due to other fused stages.
    
    Tests:
    - Added directional test to verify logics are released after finish()
    - Added test to verify logics are released when stages complete early
    
    Refs:
    - Fixes apache/pekko#1234 (GraphInterpreter should not keep references to 
shut-down logics)
    - Related to akka/akka-core#23439
---
 .../stream/impl/fusing/GraphInterpreterSpec.scala  | 74 ++++++++++++++++++++++
 .../stream/impl/fusing/GraphInterpreter.scala      | 11 +++-
 2 files changed, 82 insertions(+), 3 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala
index 10bcb91d40..c615ddafd6 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala
@@ -315,6 +315,80 @@ class GraphInterpreterSpec extends StreamSpec with 
GraphInterpreterSpecKit {
       interpreter.isSuspended should be(false)
     }
 
+    "release references to completed stage logics to prevent memory leaks" in 
new TestSetup {
+      val source = new UpstreamProbe[Int]("source")
+      val sink = new DownstreamProbe[Int]("sink")
+
+      builder(GraphStages.identity[Int])
+        .connect(source, GraphStages.identity[Int].in)
+        .connect(GraphStages.identity[Int].out, sink)
+        .init()
+
+      lastEvents() should ===(Set.empty[TestEvent])
+
+      sink.requestOne()
+      lastEvents() should ===(Set(RequestOne(source)))
+
+      source.onNext(1)
+      lastEvents() should ===(Set(OnNext(sink, 1)))
+
+      // Get reference to the logics array
+      val logicsField = interpreter.getClass.getDeclaredField("logics")
+      logicsField.setAccessible(true)
+      val logics = 
logicsField.get(interpreter).asInstanceOf[Array[org.apache.pekko.stream.stage.GraphStageLogic]]
+
+      // All logics should be non-null initially
+      logics.foreach(logic => logic should not be null)
+
+      // Complete the source to trigger shutdown of stages
+      source.onComplete()
+      lastEvents() should ===(Set(OnComplete(sink)))
+
+      // After finish() is called, all logics should be released
+      interpreter.finish()
+
+      // After finish(), all stage logics should have been released (set to 
null)
+      logics.foreach(logic => logic should be(null))
+    }
+
+    "release references to completed stage logics when some stages complete 
early" in new TestSetup {
+      val source = new UpstreamProbe[Int]("source")
+      val detachStage = detacher[Int]
+      val identityStage = GraphStages.identity[Int]
+      val sink = new DownstreamProbe[Int]("sink")
+
+      builder(detachStage, identityStage)
+        .connect(source, detachStage.shape.in)
+        .connect(detachStage.shape.out, identityStage.in)
+        .connect(identityStage.out, sink)
+        .init()
+
+      lastEvents() should ===(Set.empty[TestEvent])
+
+      sink.requestOne()
+      lastEvents() should ===(Set(RequestOne(source)))
+
+      // Get reference to the logics array
+      val logicsField = interpreter.getClass.getDeclaredField("logics")
+      logicsField.setAccessible(true)
+      val logics = 
logicsField.get(interpreter).asInstanceOf[Array[org.apache.pekko.stream.stage.GraphStageLogic]]
+
+      // All logics should be non-null initially
+      logics.foreach(logic => logic should not be null)
+
+      source.onNext(1)
+      lastEvents() should ===(Set(OnNext(sink, 1), RequestOne(source)))
+
+      // Complete the source - this should trigger completion of detacher and 
identity stages
+      source.onComplete()
+
+      // After finish() is called, all logics should be released
+      interpreter.finish()
+
+      // After finish(), all stage logics should have been released (set to 
null)
+      logics.foreach(logic => logic should be(null))
+    }
+
   }
 
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
index aedcd0b301..4dab1445d6 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
@@ -342,10 +342,13 @@ import pekko.stream.stage._
     var i = 0
     while (i < logics.length) {
       val logic = logics(i)
-      if (!isStageCompleted(logic) && !isStageFinalized(logic)) {
+      if ((logic ne null) && !isStageCompleted(logic) && 
!isStageFinalized(logic)) {
         markStageFinalized(logic)
         finalizeStage(logic)
       }
+      // Release reference to the stage logic so it can be garbage collected
+      // even if the GraphInterpreter is still alive due to other references
+      logics(i) = null
       i += 1
     }
   }
@@ -741,10 +744,12 @@ import pekko.stream.stage._
   def toSnapshot: RunningInterpreter = {
 
     val logicSnapshots = logics.zipWithIndex.map {
-      case (logic, idx) =>
+      case (logic, idx) if logic ne null =>
         LogicSnapshotImpl(idx, logic.toString, logic.attributes)
+      case (_, idx) =>
+        LogicSnapshotImpl(idx, "<completed>", Attributes.none)
     }
-    val logicIndexes = logics.zipWithIndex.map { case (stage, idx) => stage -> 
idx }.toMap
+    val logicIndexes = logics.zipWithIndex.collect { case (stage, idx) if 
stage ne null => stage -> idx }.toMap
     val connectionSnapshots = connections.filter(_ ne null).map { connection =>
       ConnectionSnapshotImpl(
         connection.id,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to