This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 01e10e130 Fix integer/long overflow in content-length parsing, 
timestamp ordering, and HTTP/2 flow control (#1049)
01e10e130 is described below

commit 01e10e130b43b3177135d76935696249cd4a9ee5
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 14 17:19:22 2026 +0100

    Fix integer/long overflow in content-length parsing, timestamp ordering, 
and HTTP/2 flow control (#1049)
    
    * Fix int/long overflow bugs in ContentLength, Timestamp, and HTTP/2 flow 
control
    
    * Update license header in TimestampSpec.scala
    
    * Update SpecializedHeaderValueParsers.scala
    
    * Create num-overflow.excludes
    
    * move
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
---
 .../2.0.x.backwards.excludes/num-overflow.excludes | 22 ++++++++
 .../pekko/http/impl/engine/http2/Http2Demux.scala  |  4 +-
 .../http/impl/engine/http2/Http2Multiplexer.scala  | 16 ++++--
 .../http/impl/engine/http2/Http2Protocol.scala     | 11 ++++
 .../impl/engine/http2/Http2StreamHandling.scala    | 50 +++++++++++++------
 .../parsing/SpecializedHeaderValueParsers.scala    |  9 ++--
 .../apache/pekko/http/impl/util/Timestamp.scala    |  4 +-
 .../parsing/ContentLengthHeaderParserSpec.scala    |  2 +
 .../pekko/http/impl/util/TimestampSpec.scala       | 58 ++++++++++++++++++++++
 .../http/impl/engine/http2/Http2ServerSpec.scala   | 23 +++++++++
 10 files changed, 174 insertions(+), 25 deletions(-)

diff --git 
a/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes
 
b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes
new file mode 100644
index 000000000..511213069
--- /dev/null
+++ 
b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changes made to better apply number overflow edge cases
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.streamId")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStreamImpl.increaseWindow")
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
index ea02bf7b4..77dd782dd 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
@@ -355,7 +355,9 @@ private[http2] abstract class Http2Demux(http2Settings: 
Http2CommonSettings,
             frame match {
               case WindowUpdateFrame(streamId, increment)
                   if streamId == 0 /* else fall through to StreamFrameEvent */ 
=>
-                multiplexer.updateConnectionLevelWindow(increment)
+                if (!multiplexer.updateConnectionLevelWindow(increment))
+                  pushGOAWAY(ErrorCode.FLOW_CONTROL_ERROR,
+                    "WINDOW_UPDATE would exceed maximum connection-level 
flow-control window size")
               case p: PriorityFrame    => multiplexer.updatePriority(p)
               case s: StreamFrameEvent =>
                 if (!terminating)
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
index 3a7f77b3e..60c3ad996 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
@@ -33,7 +33,7 @@ import scala.collection.mutable
 @InternalApi
 private[http2] trait Http2Multiplexer {
   def pushControlFrame(frame: FrameEvent): Unit
-  def updateConnectionLevelWindow(increment: Int): Unit
+  def updateConnectionLevelWindow(increment: Int): Boolean
   def updateMaxFrameSize(newMaxFrameSize: Int): Unit
   def updateDefaultWindow(newDefaultWindow: Int): Unit
   def updatePriority(priorityFrame: PriorityFrame): Unit
@@ -110,10 +110,16 @@ private[http2] trait Http2MultiplexerSupport { logic: 
GraphStageLogic with Stage
 
       override def pushControlFrame(frame: FrameEvent): Unit = 
updateState(_.pushControlFrame(frame))
 
-      def updateConnectionLevelWindow(increment: Int): Unit = {
-        connectionWindowLeft += increment
-        debug(s"Updating outgoing connection window by $increment to 
$connectionWindowLeft")
-        updateState(_.connectionWindowAvailable())
+      def updateConnectionLevelWindow(increment: Int): Boolean = {
+        val newWindow = connectionWindowLeft.toLong + increment
+        if (newWindow > Http2Protocol.MaxWindowSize) {
+          false
+        } else {
+          connectionWindowLeft = newWindow.toInt
+          debug(s"Updating outgoing connection window by $increment to 
$connectionWindowLeft")
+          updateState(_.connectionWindowAvailable())
+          true
+        }
       }
       override def updateMaxFrameSize(newMaxFrameSize: Int): Unit = 
currentMaxFrameSize = newMaxFrameSize
       override def updateDefaultWindow(newDefaultWindow: Int): Unit = {
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
index f81ea6ebb..daaf3f0be 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
@@ -54,6 +54,17 @@ private[http] object Http2Protocol {
    */
   final val InitialWindowSize = 65535
 
+  /**
+   * The maximum value of a flow-control window size as defined by the 
specification.
+   *
+   * See https://tools.ietf.org/html/rfc7540#section-6.9.1:
+   *    A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets.
+   *    If a sender receives a WINDOW_UPDATE that causes a flow-control window 
to
+   *    exceed this maximum, it MUST treat this as a connection or stream error
+   *    (Section 5.4) of type FLOW_CONTROL_ERROR.
+   */
+  final val MaxWindowSize: Int = Int.MaxValue
+
   /**
    * The initial frame size for both incoming and outgoing frames as defined 
by the
    * specification.
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
index 6da9f506f..e74691ab4 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
@@ -380,8 +380,12 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
       extends 
ReceivingDataWithBuffer(HalfClosedRemoteWaitingForOutgoingStream(extraInitialWindow))
 {
     override def handleOutgoingCreated(
         outStream: OutStream, correlationAttributes: Map[AttributeKey[?], ?]): 
StreamState = {
-      outStream.increaseWindow(extraInitialWindow)
-      Open(buffer, outStream)
+      if (outStream.increaseWindow(extraInitialWindow)) Open(buffer, outStream)
+      else {
+        outStream.cancelStream()
+        multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, 
ErrorCode.FLOW_CONTROL_ERROR))
+        Closed
+      }
     }
     override def handleOutgoingCreatedAndFinished(correlationAttributes: 
Map[AttributeKey[?], ?]): StreamState =
       HalfClosedLocal(buffer)
@@ -421,8 +425,11 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
     }
 
     def increaseWindow(delta: Int): StreamState = {
-      outStream.increaseWindow(delta)
-      this
+      if (outStream.increaseWindow(delta)) this
+      else {
+        multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, 
ErrorCode.FLOW_CONTROL_ERROR))
+        Closed
+      }
     }
   }
 
@@ -531,8 +538,12 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
       outStream.cancelStream()
     }
     override def incrementWindow(delta: Int): StreamState = {
-      outStream.increaseWindow(delta)
-      this
+      if (outStream.increaseWindow(delta)) this
+      else {
+        outStream.cancelStream()
+        multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, 
ErrorCode.FLOW_CONTROL_ERROR))
+        Closed
+      }
     }
   }
 
@@ -553,8 +564,12 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
 
     override def handleOutgoingCreated(
         outStream: OutStream, correlationAttributes: Map[AttributeKey[?], ?]): 
StreamState = {
-      outStream.increaseWindow(extraInitialWindow)
-      HalfClosedRemoteSendingData(outStream)
+      if (outStream.increaseWindow(extraInitialWindow)) 
HalfClosedRemoteSendingData(outStream)
+      else {
+        outStream.cancelStream()
+        multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, 
ErrorCode.FLOW_CONTROL_ERROR))
+        Closed
+      }
     }
     override def handleOutgoingCreatedAndFinished(correlationAttributes: 
Map[AttributeKey[?], ?]): StreamState = Closed
   }
@@ -700,11 +715,12 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
   }
 
   trait OutStream {
+    def streamId: Int
     def canSend: Boolean
     def cancelStream(): Unit
     def endStreamIfPossible(): Option[FrameEvent]
     def nextFrame(maxBytesToSend: Int): DataFrame
-    def increaseWindow(delta: Int): Unit
+    def increaseWindow(delta: Int): Boolean
     def isDone: Boolean
   }
   object OutStream {
@@ -816,11 +832,17 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
     }
     def bufferedBytes: Int = buffer.length
 
-    override def increaseWindow(increment: Int): Unit = if (increment >= 0) {
-      outboundWindowLeft += increment
-      debug(s"Updating window for $streamId by $increment to 
$outboundWindowLeft buffered bytes: $bufferedBytes")
-      enqueueIfPossible()
-    }
+    override def increaseWindow(increment: Int): Boolean = if (increment >= 0) 
{
+      val newWindow = outboundWindowLeft.toLong + increment
+      if (newWindow > Http2Protocol.MaxWindowSize) {
+        false
+      } else {
+        outboundWindowLeft = newWindow.toInt
+        debug(s"Updating window for $streamId by $increment to 
$outboundWindowLeft buffered bytes: $bufferedBytes")
+        enqueueIfPossible()
+        true
+      }
+    } else true
 
     // external callbacks, need to make sure that potential stream state 
changing events are run through the state machine
     override def onPush(): Unit = {
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
index 3b2977217..883cf8ffe 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
@@ -37,9 +37,12 @@ private[parsing] object SpecializedHeaderValueParsers {
         : (HttpHeader, Int) = {
       @tailrec def recurse(ix: Int = valueStart, result: Long = 0): 
(HttpHeader, Int) = {
         val c = byteChar(input, ix)
-        if (result < 0) fail("`Content-Length` header value must not exceed 
63-bit integer range")
-        else if (DIGIT(c)) recurse(ix + 1, result * 10 + c - '0')
-        else if (WSP(c)) recurse(ix + 1, result)
+        if (DIGIT(c)) {
+          val digit = c - '0'
+          if (result > (Long.MaxValue - digit) / 10)
+            fail("`Content-Length` header value must not exceed 63-bit integer 
range")
+          else recurse(ix + 1, result * 10 + digit)
+        } else if (WSP(c)) recurse(ix + 1, result)
         else if (c == '\r' && byteAt(input, ix + 1) == LF_BYTE) 
(`Content-Length`(result), ix + 2)
         else if (c == '\n') (`Content-Length`(result), ix + 1)
         else fail("Illegal `Content-Length` header value")
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala 
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
index f4963bcd4..b63370719 100644
--- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
+++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
  * We mark it private[http] because we don't want to support it as public API.
  */
 @InternalApi
-private[http] class Timestamp private (val timestampNanos: Long) extends 
AnyVal {
+private[http] class Timestamp private[util] (val timestampNanos: Long) extends 
AnyVal {
 
   def +(period: Duration): Timestamp =
     if (isNever) this
@@ -49,6 +49,6 @@ private[http] object Timestamp {
   def never: Timestamp = new Timestamp(Long.MaxValue)
 
   implicit object Ordering extends Ordering[Timestamp] {
-    def compare(x: Timestamp, y: Timestamp): Int = 
math.signum(x.timestampNanos - y.timestampNanos).toInt
+    def compare(x: Timestamp, y: Timestamp): Int = 
java.lang.Long.compare(x.timestampNanos, y.timestampNanos)
   }
 }
diff --git 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
index 1671d6249..2e28d92e4 100644
--- 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
+++ 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
@@ -37,6 +37,8 @@ abstract class ContentLengthHeaderParserSpec(mode: String, 
newLine: String) exte
       a[ParsingException] should be thrownBy parse("9223372036854775808") // 
Long.MaxValue + 1
       a[ParsingException] should be thrownBy parse("92233720368547758070") // 
Long.MaxValue * 10 which is 0 taken overflow into account
       a[ParsingException] should be thrownBy parse("92233720368547758080") // 
(Long.MaxValue + 1) * 10 which is 0 taken overflow into account
+      // overflow that wraps to a small positive value (was not caught by the 
old `result < 0` check)
+      a[ParsingException] should be thrownBy parse("18446744073709551634") // 
~2^64+18, wraps to 18 in signed Long
     }
   }
 
diff --git 
a/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala 
b/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala
new file mode 100644
index 000000000..6b3630c32
--- /dev/null
+++ 
b/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.http.impl.util
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class TimestampSpec extends AnyWordSpec with Matchers {
+
+  "Timestamp.Ordering" should {
+    "correctly order two normal timestamps" in {
+      val t1 = new Timestamp(100L)
+      val t2 = new Timestamp(200L)
+      Timestamp.Ordering.compare(t1, t2) should be < 0
+      Timestamp.Ordering.compare(t2, t1) should be > 0
+      Timestamp.Ordering.compare(t1, t1) shouldEqual 0
+    }
+
+    "correctly order Timestamp.never against a normal timestamp" in {
+      val normal = new Timestamp(System.nanoTime())
+      val never = Timestamp.never
+      Timestamp.Ordering.compare(normal, never) should be < 0
+      Timestamp.Ordering.compare(never, normal) should be > 0
+      Timestamp.Ordering.compare(never, never) shouldEqual 0
+    }
+
+    "not overflow when comparing Long.MaxValue (never) with Long.MinValue" in {
+      // Subtraction Long.MaxValue - Long.MinValue would overflow in signed 
arithmetic;
+      // using Long.compare avoids this.
+      val tMax = new Timestamp(Long.MaxValue)
+      val tMin = new Timestamp(Long.MinValue)
+      Timestamp.Ordering.compare(tMax, tMin) should be > 0
+      Timestamp.Ordering.compare(tMin, tMax) should be < 0
+    }
+
+    "not overflow when comparing values near Long.MaxValue and Long.MinValue" 
in {
+      val tAlmostMax = new Timestamp(Long.MaxValue - 1)
+      val tAlmostMin = new Timestamp(Long.MinValue + 1)
+      Timestamp.Ordering.compare(tAlmostMax, tAlmostMin) should be > 0
+      Timestamp.Ordering.compare(tAlmostMin, tAlmostMax) should be < 0
+    }
+  }
+}
diff --git 
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
 
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
index 167fc80ff..573a96335 100644
--- 
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
+++ 
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
@@ -1061,6 +1061,17 @@ class Http2ServerSpec extends 
Http2SpecWithMaterializer("""
           network.expectDecodedHEADERS(streamId = TheStreamId, endStream = 
true).headers should be(
             immutable.Seq(RawHeader("grpc-status", "10")))
         })
+
+      "reject stream-level WINDOW_UPDATE that would overflow the flow-control 
window with FLOW_CONTROL_ERROR"
+        .inAssertAllStagesStopped(
+          new WaitingForResponseDataSetup {
+            // fill the stream-level window up to Int.MaxValue
+            network.sendWINDOW_UPDATE(TheStreamId, Int.MaxValue - 
Http2Protocol.InitialWindowSize)
+            // one more increment must trigger RST_STREAM with 
FLOW_CONTROL_ERROR;
+            // use sendFrame directly to bypass the test-side window tracking 
assertion
+            network.sendFrame(WindowUpdateFrame(TheStreamId, 1))
+            network.expectRST_STREAM(TheStreamId, ErrorCode.FLOW_CONTROL_ERROR)
+          })
     }
 
     "support multiple concurrent substreams" should {
@@ -1274,6 +1285,18 @@ class Http2ServerSpec extends 
Http2SpecWithMaterializer("""
           network.expectRST_STREAM(1, ErrorCode.PROTOCOL_ERROR)
         })
 
+      "reject connection-level WINDOW_UPDATE that would overflow the 
flow-control window with FLOW_CONTROL_ERROR"
+        .inAssertAllStagesStopped(
+          new TestSetup with RequestResponseProbes {
+            // fill the connection-level window up to Int.MaxValue
+            network.sendWINDOW_UPDATE(0, Int.MaxValue - 
Http2Protocol.InitialWindowSize)
+            // one more increment must trigger GOAWAY with FLOW_CONTROL_ERROR;
+            // use sendFrame directly to bypass the test-side window tracking 
assertion
+            network.sendFrame(WindowUpdateFrame(0, 1))
+            val (_, errorCode) = network.expectGOAWAY()
+            errorCode should ===(ErrorCode.FLOW_CONTROL_ERROR)
+          })
+
       "backpressure incoming frames when outgoing control frame buffer 
fills".inAssertAllStagesStopped(
         new TestSetup with HandlerFunctionSupport {
           override def settings: ServerSettings =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to