This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new c2a969af47 Port akka-core#31777: O(1) mandatory attribute lookup via 
map index (#2758)
c2a969af47 is described below

commit c2a969af478e08676a36d70ea5df1f185ee384ac
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 20 19:32:37 2026 +0100

    Port akka-core#31777: O(1) mandatory attribute lookup via map index (#2758)
    
    * Initial plan
    
    * Port akka-core PR #31777: optimize mandatory attribute lookup with O(1) 
map
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix unapply return type to not use unnecessary tuple wrapper
    
    Co-authored-by: pjfanning <[email protected]>
    
    * mima
    
    * Update Attributes.scala
    
    * Update optimized-mandatory-attributes.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../optimized-mandatory-attributes.excludes        |  23 ++++
 .../scala/org/apache/pekko/stream/Attributes.scala | 131 +++++++++++++++++----
 2 files changed, 128 insertions(+), 26 deletions(-)

diff --git 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/optimized-mandatory-attributes.excludes
 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/optimized-mandatory-attributes.excludes
new file mode 100644
index 0000000000..b38fc19c8d
--- /dev/null
+++ 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/optimized-mandatory-attributes.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Optimized mandatory attributes
+ProblemFilters.exclude[FinalClassProblem]("org.apache.pekko.stream.Attributes$NestedMaterializationCancellationPolicy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Attributes.fromProduct")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.Attributes.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Attributes._1")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.stream.Attributes$")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala 
b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
index 28a94b9ca7..838eedc52b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
@@ -17,6 +17,7 @@ import java.net.URLEncoder
 import java.time.Duration
 import java.util.Optional
 
+import scala.annotation.nowarn
 import scala.annotation.tailrec
 import scala.concurrent.duration.FiniteDuration
 import scala.jdk.DurationConverters._
@@ -31,7 +32,7 @@ import pekko.annotation.InternalApi
 import pekko.event.Logging
 import pekko.japi.function
 import pekko.stream.impl.TraversalBuilder
-import pekko.util.{ ByteString, OptionVal }
+import pekko.util.ByteString
 import pekko.util.LineNumbers
 
 /**
@@ -45,11 +46,36 @@ import pekko.util.LineNumbers
  *
  * Operators should in general not access the `attributeList` but instead use 
`get` to get the expected
  * value of an attribute.
+ *
+ * Constructor is internal API, use factories in companion to create instances.
  */
-final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
+final class Attributes private[pekko] (
+    val attributeList: List[Attributes.Attribute],
+    private val mandatoryAttributes: Map[Class[AnyRef], 
Attributes.MandatoryAttribute])
+    extends scala.Product
+    with scala.Serializable
+    with scala.Equals {
 
   import Attributes._
 
+  // for binary compatibility (used to be a case class)
+  @deprecated("Use factories on companion object instead", since = "2.0.0")
+  @nowarn("msg=deprecated")
+  def this(attributeList: List[Attributes.Attribute] = Nil) =
+    this(
+      attributeList,
+      attributeList.reverseIterator
+        .foldLeft(Map.newBuilder[Class[AnyRef], 
Attributes.MandatoryAttribute]) {
+          case (builder, attribute) =>
+            attribute match {
+              case m: Attributes.MandatoryAttribute =>
+                builder += (m.getClass.asInstanceOf[Class[AnyRef]] -> m)
+                builder
+              case _ => builder
+            }
+        }
+        .result())
+
   /**
    * Note that this must only be used during traversal building and not during 
materialization
    * as it will then always return true because of the defaults from the 
ActorMaterializerSettings
@@ -119,6 +145,8 @@ final case class Attributes(attributeList: 
List[Attributes.Attribute] = Nil) {
   /**
    * Scala API: Get the most specific of one of the mandatory attributes. 
Mandatory attributes are guaranteed
    * to always be among the attributes when the attributes are coming from a 
materialization.
+   *
+   * Note: looks for the exact mandatory attribute class, hierarchies of the 
same mandatory attribute not supported
    */
   def mandatoryAttribute[T <: MandatoryAttribute: ClassTag]: T = {
     val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]
@@ -129,20 +157,16 @@ final case class Attributes(attributeList: 
List[Attributes.Attribute] = Nil) {
    * Java API: Get the most specific of one of the mandatory attributes. 
Mandatory attributes are guaranteed
    * to always be among the attributes when the attributes are coming from a 
materialization.
    *
+   * Note: looks for the exact mandatory attribute class, hierarchies of the 
same mandatory attribute not supported
+   *
    * @param c A class that is a subtype of [[MandatoryAttribute]]
    */
   def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = {
-    @tailrec
-    def find(list: List[Attribute]): OptionVal[Attribute] = list match {
-      case Nil          => OptionVal.None
-      case head :: tail =>
-        if (c.isInstance(head)) OptionVal.Some(head)
-        else find(tail)
-    }
-
-    find(attributeList) match {
-      case OptionVal.Some(t) => t.asInstanceOf[T]
-      case _                 => throw new IllegalStateException(s"Mandatory 
attribute [$c] not found")
+    try {
+      mandatoryAttributes(c.asInstanceOf[Class[AnyRef]]).asInstanceOf[T]
+    } catch {
+      case _: NoSuchElementException =>
+        throw new IllegalStateException(s"Mandatory attribute [$c] not found")
     }
   }
 
@@ -153,16 +177,30 @@ final case class Attributes(attributeList: 
List[Attributes.Attribute] = Nil) {
   def and(other: Attributes): Attributes = {
     if (attributeList.isEmpty) other
     else if (other.attributeList.isEmpty) this
-    else if (other.attributeList.tail.isEmpty) 
Attributes(other.attributeList.head :: attributeList)
-    else Attributes(other.attributeList ::: attributeList)
+    else if (other.attributeList.tail.isEmpty) {
+      // note the inverted order for attributes vs mandatory values here
+      val newAttributes = other.attributeList.head :: attributeList
+      val newMandatory = this.mandatoryAttributes ++ other.mandatoryAttributes
+      new Attributes(newAttributes, newMandatory)
+    } else {
+      val newAttributes = other.attributeList ::: attributeList
+      val newMandatory = this.mandatoryAttributes ++ other.mandatoryAttributes
+      new Attributes(newAttributes, newMandatory)
+    }
   }
 
   /**
    * Adds given attribute. Added attribute is considered more specific than
    * already existing attributes of the same type.
    */
-  def and(other: Attribute): Attributes =
-    Attributes(other :: attributeList)
+  def and(other: Attribute): Attributes = {
+    other match {
+      case m: MandatoryAttribute =>
+        new Attributes(other :: attributeList, mandatoryAttributes + 
(m.getClass.asInstanceOf[Class[AnyRef]] -> m))
+      case regular =>
+        new Attributes(regular :: attributeList, mandatoryAttributes)
+    }
+  }
 
   /**
    * Extracts Name attributes and concatenates them.
@@ -298,6 +336,36 @@ final case class Attributes(attributeList: 
List[Attributes.Attribute] = Nil) {
     attributeList.reverseIterator.collectFirst { case attr if 
c.isInstance(attr) => c.cast(attr) }
   }
 
+  // for binary compatibility (used to be a case class)
+
+  @deprecated("Use explicit methods on Attributes to interact, not the ones 
provided by Product", "2.0.0")
+  override def productArity: Int = 1
+
+  @deprecated("Use explicit methods on Attributes to interact, not the ones 
provided by Product", "2.0.0")
+  override def productElement(n: Int): Any = n match {
+    case 0 => attributeList
+    case _ => throw new IllegalArgumentException()
+  }
+
+  @deprecated("Don't use copy on Attributes", "2.0.0")
+  @nowarn("msg=deprecated")
+  def copy(attributeList: List[Attribute] = attributeList): Attributes =
+    new Attributes(attributeList)
+
+  override def canEqual(that: Any): Boolean = that.isInstanceOf[Attributes]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: Attributes =>
+      attributeList == that.attributeList &&
+      mandatoryAttributes == that.mandatoryAttributes
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(attributeList, mandatoryAttributes)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+
 }
 
 /**
@@ -315,6 +383,17 @@ object Attributes {
   @DoNotInherit
   sealed trait MandatoryAttribute extends Attribute
 
+  def apply(): Attributes = new Attributes(Nil, Map.empty)
+
+  @nowarn("msg=deprecated")
+  def apply(attributeList: List[Attribute] = Nil): Attributes = new 
Attributes(attributeList)
+
+  // for binary compatibility
+
+  @deprecated("Use explicit methods on Attributes to interact, not the 
synthetic case class ones", "2.0.0")
+  def unapply(attrs: Attributes): Option[List[Attribute]] =
+    Some(attrs.attributeList)
+
   final case class Name(n: String) extends Attribute
 
   /**
@@ -561,9 +640,12 @@ object Attributes {
    * , otherwise these stages will immediately cancel without materializing 
the nested flow.
    */
   @ApiMayChange
-  class NestedMaterializationCancellationPolicy 
private[NestedMaterializationCancellationPolicy] (
-      val propagateToNestedMaterialization: Boolean)
-      extends MandatoryAttribute
+  final class NestedMaterializationCancellationPolicy 
private[NestedMaterializationCancellationPolicy] (
+      val propagateToNestedMaterialization: Boolean,
+      name: String)
+      extends MandatoryAttribute {
+    override def toString: String = name
+  }
 
   @ApiMayChange
   object NestedMaterializationCancellationPolicy {
@@ -575,9 +657,7 @@ object Attributes {
      * This applies to [[pekko.stream.scaladsl.FlowOps.flatMapPrefix]], 
[[pekko.stream.scaladsl.Flow.futureFlow]] and derived operators.
      */
     val EagerCancellation: NestedMaterializationCancellationPolicy =
-      new NestedMaterializationCancellationPolicy(false) {
-        override def toString: String = "EagerCancellation"
-      }
+      new NestedMaterializationCancellationPolicy(false, "EagerCancellation")
 
     /**
      * A [[NestedMaterializationCancellationPolicy]] that configures graph 
stages
@@ -585,9 +665,8 @@ object Attributes {
      * nested flow materialization. Once the nested flow is materialized it 
will be cancelled immediately.
      * This applies to [[pekko.stream.scaladsl.FlowOps.flatMapPrefix]], 
[[pekko.stream.scaladsl.Flow.futureFlow]] and derived operators.
      */
-    val PropagateToNested: NestedMaterializationCancellationPolicy = new 
NestedMaterializationCancellationPolicy(true) {
-      override def toString: String = "PropagateToNested"
-    }
+    val PropagateToNested: NestedMaterializationCancellationPolicy =
+      new NestedMaterializationCancellationPolicy(true, "PropagateToNested")
 
     /**
      * Default [[NestedMaterializationCancellationPolicy]],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to