This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git
The following commit(s) were added to refs/heads/main by this push:
new 01e10e130 Fix integer/long overflow in content-length parsing,
timestamp ordering, and HTTP/2 flow control (#1049)
01e10e130 is described below
commit 01e10e130b43b3177135d76935696249cd4a9ee5
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 14 17:19:22 2026 +0100
Fix integer/long overflow in content-length parsing, timestamp ordering,
and HTTP/2 flow control (#1049)
* Fix int/long overflow bugs in ContentLength, Timestamp, and HTTP/2 flow
control
* Update license header in TimestampSpec.scala
* Update SpecializedHeaderValueParsers.scala
* Create num-overflow.excludes
* move
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
---
.../2.0.x.backwards.excludes/num-overflow.excludes | 22 ++++++++
.../pekko/http/impl/engine/http2/Http2Demux.scala | 4 +-
.../http/impl/engine/http2/Http2Multiplexer.scala | 16 ++++--
.../http/impl/engine/http2/Http2Protocol.scala | 11 ++++
.../impl/engine/http2/Http2StreamHandling.scala | 50 +++++++++++++------
.../parsing/SpecializedHeaderValueParsers.scala | 9 ++--
.../apache/pekko/http/impl/util/Timestamp.scala | 4 +-
.../parsing/ContentLengthHeaderParserSpec.scala | 2 +
.../pekko/http/impl/util/TimestampSpec.scala | 58 ++++++++++++++++++++++
.../http/impl/engine/http2/Http2ServerSpec.scala | 23 +++++++++
10 files changed, 174 insertions(+), 25 deletions(-)
diff --git
a/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes
b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes
new file mode 100644
index 000000000..511213069
--- /dev/null
+++
b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changes made to better apply number overflow edge cases
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.streamId")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStreamImpl.increaseWindow")
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
index ea02bf7b4..77dd782dd 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala
@@ -355,7 +355,9 @@ private[http2] abstract class Http2Demux(http2Settings:
Http2CommonSettings,
frame match {
case WindowUpdateFrame(streamId, increment)
if streamId == 0 /* else fall through to StreamFrameEvent */
=>
- multiplexer.updateConnectionLevelWindow(increment)
+ if (!multiplexer.updateConnectionLevelWindow(increment))
+ pushGOAWAY(ErrorCode.FLOW_CONTROL_ERROR,
+ "WINDOW_UPDATE would exceed maximum connection-level
flow-control window size")
case p: PriorityFrame => multiplexer.updatePriority(p)
case s: StreamFrameEvent =>
if (!terminating)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
index 3a7f77b3e..60c3ad996 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
@@ -33,7 +33,7 @@ import scala.collection.mutable
@InternalApi
private[http2] trait Http2Multiplexer {
def pushControlFrame(frame: FrameEvent): Unit
- def updateConnectionLevelWindow(increment: Int): Unit
+ def updateConnectionLevelWindow(increment: Int): Boolean
def updateMaxFrameSize(newMaxFrameSize: Int): Unit
def updateDefaultWindow(newDefaultWindow: Int): Unit
def updatePriority(priorityFrame: PriorityFrame): Unit
@@ -110,10 +110,16 @@ private[http2] trait Http2MultiplexerSupport { logic:
GraphStageLogic with Stage
override def pushControlFrame(frame: FrameEvent): Unit =
updateState(_.pushControlFrame(frame))
- def updateConnectionLevelWindow(increment: Int): Unit = {
- connectionWindowLeft += increment
- debug(s"Updating outgoing connection window by $increment to
$connectionWindowLeft")
- updateState(_.connectionWindowAvailable())
+ def updateConnectionLevelWindow(increment: Int): Boolean = {
+ val newWindow = connectionWindowLeft.toLong + increment
+ if (newWindow > Http2Protocol.MaxWindowSize) {
+ false
+ } else {
+ connectionWindowLeft = newWindow.toInt
+ debug(s"Updating outgoing connection window by $increment to
$connectionWindowLeft")
+ updateState(_.connectionWindowAvailable())
+ true
+ }
}
override def updateMaxFrameSize(newMaxFrameSize: Int): Unit =
currentMaxFrameSize = newMaxFrameSize
override def updateDefaultWindow(newDefaultWindow: Int): Unit = {
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
index f81ea6ebb..daaf3f0be 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala
@@ -54,6 +54,17 @@ private[http] object Http2Protocol {
*/
final val InitialWindowSize = 65535
+ /**
+ * The maximum value of a flow-control window size as defined by the
specification.
+ *
+ * See https://tools.ietf.org/html/rfc7540#section-6.9.1:
+ * A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets.
+ * If a sender receives a WINDOW_UPDATE that causes a flow-control window
to
+ * exceed this maximum, it MUST treat this as a connection or stream error
+ * (Section 5.4) of type FLOW_CONTROL_ERROR.
+ */
+ final val MaxWindowSize: Int = Int.MaxValue
+
/**
* The initial frame size for both incoming and outgoing frames as defined
by the
* specification.
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
index 6da9f506f..e74691ab4 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
@@ -380,8 +380,12 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
extends
ReceivingDataWithBuffer(HalfClosedRemoteWaitingForOutgoingStream(extraInitialWindow))
{
override def handleOutgoingCreated(
outStream: OutStream, correlationAttributes: Map[AttributeKey[?], ?]):
StreamState = {
- outStream.increaseWindow(extraInitialWindow)
- Open(buffer, outStream)
+ if (outStream.increaseWindow(extraInitialWindow)) Open(buffer, outStream)
+ else {
+ outStream.cancelStream()
+ multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId,
ErrorCode.FLOW_CONTROL_ERROR))
+ Closed
+ }
}
override def handleOutgoingCreatedAndFinished(correlationAttributes:
Map[AttributeKey[?], ?]): StreamState =
HalfClosedLocal(buffer)
@@ -421,8 +425,11 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
}
def increaseWindow(delta: Int): StreamState = {
- outStream.increaseWindow(delta)
- this
+ if (outStream.increaseWindow(delta)) this
+ else {
+ multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId,
ErrorCode.FLOW_CONTROL_ERROR))
+ Closed
+ }
}
}
@@ -531,8 +538,12 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
outStream.cancelStream()
}
override def incrementWindow(delta: Int): StreamState = {
- outStream.increaseWindow(delta)
- this
+ if (outStream.increaseWindow(delta)) this
+ else {
+ outStream.cancelStream()
+ multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId,
ErrorCode.FLOW_CONTROL_ERROR))
+ Closed
+ }
}
}
@@ -553,8 +564,12 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
override def handleOutgoingCreated(
outStream: OutStream, correlationAttributes: Map[AttributeKey[?], ?]):
StreamState = {
- outStream.increaseWindow(extraInitialWindow)
- HalfClosedRemoteSendingData(outStream)
+ if (outStream.increaseWindow(extraInitialWindow))
HalfClosedRemoteSendingData(outStream)
+ else {
+ outStream.cancelStream()
+ multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId,
ErrorCode.FLOW_CONTROL_ERROR))
+ Closed
+ }
}
override def handleOutgoingCreatedAndFinished(correlationAttributes:
Map[AttributeKey[?], ?]): StreamState = Closed
}
@@ -700,11 +715,12 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
}
trait OutStream {
+ def streamId: Int
def canSend: Boolean
def cancelStream(): Unit
def endStreamIfPossible(): Option[FrameEvent]
def nextFrame(maxBytesToSend: Int): DataFrame
- def increaseWindow(delta: Int): Unit
+ def increaseWindow(delta: Int): Boolean
def isDone: Boolean
}
object OutStream {
@@ -816,11 +832,17 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
}
def bufferedBytes: Int = buffer.length
- override def increaseWindow(increment: Int): Unit = if (increment >= 0) {
- outboundWindowLeft += increment
- debug(s"Updating window for $streamId by $increment to
$outboundWindowLeft buffered bytes: $bufferedBytes")
- enqueueIfPossible()
- }
+ override def increaseWindow(increment: Int): Boolean = if (increment >= 0)
{
+ val newWindow = outboundWindowLeft.toLong + increment
+ if (newWindow > Http2Protocol.MaxWindowSize) {
+ false
+ } else {
+ outboundWindowLeft = newWindow.toInt
+ debug(s"Updating window for $streamId by $increment to
$outboundWindowLeft buffered bytes: $bufferedBytes")
+ enqueueIfPossible()
+ true
+ }
+ } else true
// external callbacks, need to make sure that potential stream state
changing events are run through the state machine
override def onPush(): Unit = {
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
index 3b2977217..883cf8ffe 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala
@@ -37,9 +37,12 @@ private[parsing] object SpecializedHeaderValueParsers {
: (HttpHeader, Int) = {
@tailrec def recurse(ix: Int = valueStart, result: Long = 0):
(HttpHeader, Int) = {
val c = byteChar(input, ix)
- if (result < 0) fail("`Content-Length` header value must not exceed
63-bit integer range")
- else if (DIGIT(c)) recurse(ix + 1, result * 10 + c - '0')
- else if (WSP(c)) recurse(ix + 1, result)
+ if (DIGIT(c)) {
+ val digit = c - '0'
+ if (result > (Long.MaxValue - digit) / 10)
+ fail("`Content-Length` header value must not exceed 63-bit integer
range")
+ else recurse(ix + 1, result * 10 + digit)
+ } else if (WSP(c)) recurse(ix + 1, result)
else if (c == '\r' && byteAt(input, ix + 1) == LF_BYTE)
(`Content-Length`(result), ix + 2)
else if (c == '\n') (`Content-Length`(result), ix + 1)
else fail("Illegal `Content-Length` header value")
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
index f4963bcd4..b63370719 100644
--- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
+++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
* We mark it private[http] because we don't want to support it as public API.
*/
@InternalApi
-private[http] class Timestamp private (val timestampNanos: Long) extends
AnyVal {
+private[http] class Timestamp private[util] (val timestampNanos: Long) extends
AnyVal {
def +(period: Duration): Timestamp =
if (isNever) this
@@ -49,6 +49,6 @@ private[http] object Timestamp {
def never: Timestamp = new Timestamp(Long.MaxValue)
implicit object Ordering extends Ordering[Timestamp] {
- def compare(x: Timestamp, y: Timestamp): Int =
math.signum(x.timestampNanos - y.timestampNanos).toInt
+ def compare(x: Timestamp, y: Timestamp): Int =
java.lang.Long.compare(x.timestampNanos, y.timestampNanos)
}
}
diff --git
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
index 1671d6249..2e28d92e4 100644
---
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
+++
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala
@@ -37,6 +37,8 @@ abstract class ContentLengthHeaderParserSpec(mode: String,
newLine: String) exte
a[ParsingException] should be thrownBy parse("9223372036854775808") //
Long.MaxValue + 1
a[ParsingException] should be thrownBy parse("92233720368547758070") //
Long.MaxValue * 10 which is 0 taken overflow into account
a[ParsingException] should be thrownBy parse("92233720368547758080") //
(Long.MaxValue + 1) * 10 which is 0 taken overflow into account
+ // overflow that wraps to a small positive value (was not caught by the
old `result < 0` check)
+ a[ParsingException] should be thrownBy parse("18446744073709551634") //
~2^64+18, wraps to 18 in signed Long
}
}
diff --git
a/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala
b/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala
new file mode 100644
index 000000000..6b3630c32
--- /dev/null
+++
b/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.http.impl.util
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class TimestampSpec extends AnyWordSpec with Matchers {
+
+ "Timestamp.Ordering" should {
+ "correctly order two normal timestamps" in {
+ val t1 = new Timestamp(100L)
+ val t2 = new Timestamp(200L)
+ Timestamp.Ordering.compare(t1, t2) should be < 0
+ Timestamp.Ordering.compare(t2, t1) should be > 0
+ Timestamp.Ordering.compare(t1, t1) shouldEqual 0
+ }
+
+ "correctly order Timestamp.never against a normal timestamp" in {
+ val normal = new Timestamp(System.nanoTime())
+ val never = Timestamp.never
+ Timestamp.Ordering.compare(normal, never) should be < 0
+ Timestamp.Ordering.compare(never, normal) should be > 0
+ Timestamp.Ordering.compare(never, never) shouldEqual 0
+ }
+
+ "not overflow when comparing Long.MaxValue (never) with Long.MinValue" in {
+ // Subtraction Long.MaxValue - Long.MinValue would overflow in signed
arithmetic;
+ // using Long.compare avoids this.
+ val tMax = new Timestamp(Long.MaxValue)
+ val tMin = new Timestamp(Long.MinValue)
+ Timestamp.Ordering.compare(tMax, tMin) should be > 0
+ Timestamp.Ordering.compare(tMin, tMax) should be < 0
+ }
+
+ "not overflow when comparing values near Long.MaxValue and Long.MinValue"
in {
+ val tAlmostMax = new Timestamp(Long.MaxValue - 1)
+ val tAlmostMin = new Timestamp(Long.MinValue + 1)
+ Timestamp.Ordering.compare(tAlmostMax, tAlmostMin) should be > 0
+ Timestamp.Ordering.compare(tAlmostMin, tAlmostMax) should be < 0
+ }
+ }
+}
diff --git
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
index 167fc80ff..573a96335 100644
---
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
+++
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala
@@ -1061,6 +1061,17 @@ class Http2ServerSpec extends
Http2SpecWithMaterializer("""
network.expectDecodedHEADERS(streamId = TheStreamId, endStream =
true).headers should be(
immutable.Seq(RawHeader("grpc-status", "10")))
})
+
+ "reject stream-level WINDOW_UPDATE that would overflow the flow-control
window with FLOW_CONTROL_ERROR"
+ .inAssertAllStagesStopped(
+ new WaitingForResponseDataSetup {
+ // fill the stream-level window up to Int.MaxValue
+ network.sendWINDOW_UPDATE(TheStreamId, Int.MaxValue -
Http2Protocol.InitialWindowSize)
+ // one more increment must trigger RST_STREAM with
FLOW_CONTROL_ERROR;
+ // use sendFrame directly to bypass the test-side window tracking
assertion
+ network.sendFrame(WindowUpdateFrame(TheStreamId, 1))
+ network.expectRST_STREAM(TheStreamId, ErrorCode.FLOW_CONTROL_ERROR)
+ })
}
"support multiple concurrent substreams" should {
@@ -1274,6 +1285,18 @@ class Http2ServerSpec extends
Http2SpecWithMaterializer("""
network.expectRST_STREAM(1, ErrorCode.PROTOCOL_ERROR)
})
+ "reject connection-level WINDOW_UPDATE that would overflow the
flow-control window with FLOW_CONTROL_ERROR"
+ .inAssertAllStagesStopped(
+ new TestSetup with RequestResponseProbes {
+ // fill the connection-level window up to Int.MaxValue
+ network.sendWINDOW_UPDATE(0, Int.MaxValue -
Http2Protocol.InitialWindowSize)
+ // one more increment must trigger GOAWAY with FLOW_CONTROL_ERROR;
+ // use sendFrame directly to bypass the test-side window tracking
assertion
+ network.sendFrame(WindowUpdateFrame(0, 1))
+ val (_, errorCode) = network.expectGOAWAY()
+ errorCode should ===(ErrorCode.FLOW_CONTROL_ERROR)
+ })
+
"backpressure incoming frames when outgoing control frame buffer
fills".inAssertAllStagesStopped(
new TestSetup with HandlerFunctionSupport {
override def settings: ServerSettings =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]