This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 68449a0cd Port alpakka PRs #2969 and #2971: fix MQTT QoS encoding and
UnsubAck decoding (#1594)
68449a0cd is described below
commit 68449a0cdd24879fc0887099746801039850e5da
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Apr 28 09:31:44 2026 +0200
Port alpakka PRs #2969 and #2971: fix MQTT QoS encoding and UnsubAck
decoding (#1594)
* Port alpakka PRs #2969 and #2971: fix MQTT QoS encoding and UnsubAck
decoding
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/0bc4a6e9-bdec-4131-81ed-f4d63b4c4806
Co-authored-by: pjfanning <[email protected]>
* Fix test assertion in subscribe QoS test
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/0bc4a6e9-bdec-4131-81ed-f4d63b4c4806
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* mima
* Update qos.backwards.excludes
* Re-ignore flaky server flow test - race condition not fully resolved
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/589f1e14-d041-45c4-88bc-044468037a4c
Co-authored-by: pjfanning <[email protected]>
* Fix race condition: stash PublishReceivedLocally until subscription is
registered
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/41ef4746-8bce-487e-ae91-b7c367374fae
Co-authored-by: pjfanning <[email protected]>
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Bound pre-subscription PublishReceivedLocally stash by
serverSendBufferSize
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/b0eac44e-5a32-46cc-a830-6f44fe48abde
Co-authored-by: pjfanning <[email protected]>
* Update assertion in MqttFlowSpec for subscription QoS
* Fix publishUnacknowledged PubRec guard: QoSAtMostOnceDelivery →
QoSExactlyOnceDelivery; add regression test
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/b5f9765c-4beb-4d69-823b-0283a9ef9257
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../2.0.x.backward.excludes/qos.backwards.excludes | 24 ++++++
.../mqtt/streaming/impl/ClientState.scala | 4 +-
.../mqtt/streaming/impl/RequestState.scala | 8 +-
.../mqtt/streaming/impl/ServerState.scala | 22 +++--
.../stream/connectors/mqtt/streaming/model.scala | 49 +++++++----
.../src/test/java/docs/javadsl/MqttFlowTest.java | 5 +-
.../test/scala/docs/scaladsl/MqttCodecSpec.scala | 24 +++---
.../test/scala/docs/scaladsl/MqttFlowSpec.scala | 40 +++++++--
.../test/scala/docs/scaladsl/MqttSessionSpec.scala | 95 ++++++++++++++++------
9 files changed, 201 insertions(+), 70 deletions(-)
diff --git
a/mqtt-streaming/src/main/mima-filters/2.0.x.backward.excludes/qos.backwards.excludes
b/mqtt-streaming/src/main/mima-filters/2.0.x.backward.excludes/qos.backwards.excludes
new file mode 100644
index 000000000..6c5128293
--- /dev/null
+++
b/mqtt-streaming/src/main/mima-filters/2.0.x.backward.excludes/qos.backwards.excludes
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# fix MQTT QoS encoding and UnsubAck decoding
(https://github.com/apache/pekko-connectors/pull/1594)
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSFailure")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSReserved")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSExactlyOnceDelivery")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSAtLeastOnceDelivery")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSAtMostOnceDelivery")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.ReservedUnsubAck")
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
index ac3c90fb9..6e819657e 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
@@ -428,7 +428,7 @@ import scala.util.{ Either, Failure, Success }
serverConnected(data, resetPingReqTimer = true)
case (_, PublishReceivedFromRemote(_, publish, local))
- if (publish.flags & ControlPacketFlags.QoSReserved).underlying
== 0 =>
+ if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0
=>
local.success(Consumer.ForwardPublish)
serverConnected(data, resetPingReqTimer = false)
@@ -479,7 +479,7 @@ import scala.util.{ Either, Failure, Success }
}
case (context, PublishReceivedLocally(publish, _))
- if (publish.flags & ControlPacketFlags.QoSReserved).underlying
== 0 =>
+ if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0
=>
QueueOfferState.waitForQueueOfferCompleted(
data.remote.offer(ForwardPublish(publish, None)),
result => QueueOfferCompleted(ByteString.empty, result.toEither),
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
index 0081f9bec..67a7eb7df 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
@@ -158,12 +158,12 @@ import scala.util.{ Either, Failure, Success }
Behaviors
.receive[Event] {
case (_, PubAckReceivedFromRemote(local))
- if
data.publish.flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) =>
+ if
data.publish.flags.contains(PublishQoSFlags.QoSAtLeastOnceDelivery) =>
local.success(ForwardPubAck(data.publishData))
Behaviors.stopped
case (_, PubRecReceivedFromRemote(local))
- if
data.publish.flags.contains(ControlPacketFlags.QoSAtMostOnceDelivery) =>
+ if
data.publish.flags.contains(PublishQoSFlags.QoSExactlyOnceDelivery) =>
local.success(ForwardPubRec(data.publishData))
timer.cancel(ReceivePubackrec)
publishAcknowledged(data)
@@ -315,10 +315,10 @@ import scala.util.{ Either, Failure, Success }
timer.startSingleTimer(ReceivePubackrel, ReceivePubAckRecTimeout,
data.settings.consumerPubAckRecTimeout)
Behaviors
.receiveMessagePartial[Event] {
- case PubAckReceivedLocally(remote) if
data.publish.flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) =>
+ case PubAckReceivedLocally(remote) if
data.publish.flags.contains(PublishQoSFlags.QoSAtLeastOnceDelivery) =>
remote.success(ForwardPubAck)
Behaviors.stopped
- case PubRecReceivedLocally(remote) if
data.publish.flags.contains(ControlPacketFlags.QoSExactlyOnceDelivery) =>
+ case PubRecReceivedLocally(remote) if
data.publish.flags.contains(PublishQoSFlags.QoSExactlyOnceDelivery) =>
remote.success(ForwardPubRec)
timer.cancel(ReceivePubackrel)
consumeReceived(data)
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
index 107ce2cbd..fb6a0366f 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
@@ -486,10 +486,14 @@ import scala.util.{ Failure, Success }
data.settings)))
subscribed.future.foreach(_ => context.self !
Subscribed(subscribe))(context.executionContext)
clientConnected(data)
- case (_, Subscribed(subscribe)) =>
- clientConnected(
- data.copy(
- publishers = data.publishers ++
subscribe.topicFilters.map(_._1)))
+ case (context, Subscribed(subscribe)) =>
+ val newPublishers = data.publishers ++
subscribe.topicFilters.map(_._1)
+ val (toProcess, remaining) =
data.pendingLocalPublications.partition { case (topic, _) =>
+ !data.publishers.exists(Topics.filter(_, topic)) &&
+ newPublishers.exists(Topics.filter(_, topic))
+ }
+ toProcess.foreach { case (_, prl) => context.self ! prl }
+ clientConnected(data.copy(publishers = newPublishers,
pendingLocalPublications = remaining))
case (context, UnsubscribeReceivedFromRemote(unsubscribe, local)) =>
val unsubscribed = Promise[Done]()
context.watch(
@@ -505,7 +509,7 @@ import scala.util.{ Failure, Success }
case (_, Unsubscribed(unsubscribe)) =>
clientConnected(data.copy(publishers = data.publishers --
unsubscribe.topicFilters))
case (_, PublishReceivedFromRemote(publish, local))
- if (publish.flags & ControlPacketFlags.QoSReserved).underlying
== 0 =>
+ if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0
=>
local.success(Consumer.ForwardPublish)
clientConnected(data)
case (context, prfr @ PublishReceivedFromRemote(publish @ Publish(_,
topicName, Some(packetId), _), local)) =>
@@ -552,7 +556,7 @@ import scala.util.{ Failure, Success }
clientConnected(data.copy(activeConsumers = data.activeConsumers
- topicName))
}
case (context, PublishReceivedLocally(publish, _))
- if (publish.flags & ControlPacketFlags.QoSReserved).underlying
== 0 &&
+ if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0
&&
data.publishers.exists(Topics.filter(_, publish.topicName)) =>
QueueOfferState.waitForQueueOfferCompleted(
data.remote
@@ -579,6 +583,12 @@ import scala.util.{ Failure, Success }
clientConnected(
data.copy(pendingLocalPublications =
data.pendingLocalPublications :+ (publish.topicName -> prl)))
}
+ case (_, prl @ PublishReceivedLocally(publish, _))
+ if data.pendingLocalPublications.size <
data.settings.serverSendBufferSize =>
+ // Topic not yet subscribed - stash until subscription arrives via
Subscribed event.
+ // Bounded by serverSendBufferSize to prevent unbounded growth;
excess publishes are dropped.
+ clientConnected(
+ data.copy(pendingLocalPublications =
data.pendingLocalPublications :+ (publish.topicName -> prl)))
case (context, ProducerFree(topicName)) =>
val i = data.pendingLocalPublications.indexWhere(_._1 == topicName)
if (i >= 0) {
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
index 2bdee1c89..4e1d37175 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
@@ -33,7 +33,7 @@ import scala.jdk.OptionConverters._
/**
* 2.2.1 MQTT Control Packet type
- * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
+ *
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_-
*/
object ControlPacketType {
val Reserved1 = ControlPacketType(0)
@@ -57,8 +57,8 @@ object ControlPacketType {
final case class ControlPacketType(underlying: Int) extends AnyVal
/**
- * 2.2.2 Flags
- * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
+ * 3.3.1 Publish message header flags
+ *
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
*/
object ControlPacketFlags {
val None = ControlPacketFlags(0)
@@ -66,14 +66,31 @@ object ControlPacketFlags {
val ReservedPubRel = ControlPacketFlags(1 << 1)
val ReservedSubscribe = ControlPacketFlags(1 << 1)
val ReservedUnsubscribe = ControlPacketFlags(1 << 1)
- val ReservedUnsubAck = ControlPacketFlags(1 << 1)
val DUP = ControlPacketFlags(1 << 3)
+ val RETAIN = ControlPacketFlags(1)
+}
+
+/**
+ * 3.3.1 Publish QoS flags
+ *
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.11_-
+ */
+object PublishQoSFlags {
val QoSAtMostOnceDelivery = ControlPacketFlags(0)
val QoSAtLeastOnceDelivery = ControlPacketFlags(1 << 1)
val QoSExactlyOnceDelivery = ControlPacketFlags(2 << 1)
val QoSReserved = ControlPacketFlags(3 << 1)
- val QoSFailure = ControlPacketFlags(1 << 7)
- val RETAIN = ControlPacketFlags(1)
+}
+
+/**
+ * 3.8.3 Subscribe payload QoS
+ *
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.26_-
+ */
+object SubscribeQoSFlags {
+ val QoSAtMostOnceDelivery = ControlPacketFlags(0)
+ val QoSAtLeastOnceDelivery = ControlPacketFlags(1)
+ val QoSExactlyOnceDelivery = ControlPacketFlags(2)
+ val QoSReserved = ControlPacketFlags(3)
+ val QoSFailure = ControlPacketFlags(0x80)
}
@InternalApi
@@ -304,7 +321,7 @@ final case class Publish(override val flags:
ControlPacketFlags,
* Conveniently create a publish message with at least once delivery
*/
def this(topicName: String, payload: ByteString) =
- this(ControlPacketFlags.QoSAtLeastOnceDelivery, topicName,
Some(PacketId(0)), payload)
+ this(PublishQoSFlags.QoSAtLeastOnceDelivery, topicName, Some(PacketId(0)),
payload)
override def toString: String =
s"""Publish(flags:$flags,topicName:$topicName,packetId:$packetId,payload:${payload.size}b)"""
@@ -382,7 +399,7 @@ final case class Subscribe @InternalApi private[streaming]
(packetId: PacketId,
* A convenience for subscribing to a single topic with at-least-once
semantics
*/
def this(topicFilter: String) =
- this(PacketId(0), List(topicFilter ->
ControlPacketFlags.QoSAtLeastOnceDelivery))
+ this(PacketId(0), List(topicFilter ->
SubscribeQoSFlags.QoSAtLeastOnceDelivery))
}
/**
@@ -450,10 +467,10 @@ final case class Unsubscribe @InternalApi
private[streaming] (packetId: PacketId
/**
* 3.11 UNSUBACK – Unsubscribe acknowledgement
- * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
+ *
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077
*/
final case class UnsubAck(packetId: PacketId)
- extends ControlPacket(ControlPacketType.UNSUBACK,
ControlPacketFlags.ReservedUnsubAck)
+ extends ControlPacket(ControlPacketType.UNSUBACK,
ControlPacketFlags.ReservedGeneral)
/**
* 3.12 PINGREQ – PING request
@@ -816,7 +833,7 @@ object MqttCodec {
v.decodeSubAck(l)
case (ControlPacketType.UNSUBSCRIBE,
ControlPacketFlags.ReservedUnsubscribe) =>
v.decodeUnsubscribe(l)
- case (ControlPacketType.UNSUBACK,
ControlPacketFlags.ReservedUnsubAck) =>
+ case (ControlPacketType.UNSUBACK,
ControlPacketFlags.ReservedGeneral) =>
v.decodeUnsubAck()
case (ControlPacketType.PINGREQ,
ControlPacketFlags.ReservedGeneral) =>
Right(PingReq)
@@ -905,12 +922,12 @@ object MqttCodec {
// 3.3 PUBLISH – Publish message
def decodePublish(l: Int, flags: ControlPacketFlags): Either[DecodeError,
Publish] =
try {
- if (!flags.contains(ControlPacketFlags.QoSReserved)) {
+ if (!flags.contains(PublishQoSFlags.QoSReserved)) {
val packetLen = v.len
val topicName = v.decodeString()
val packetId =
- if (flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) ||
- flags.contains(ControlPacketFlags.QoSExactlyOnceDelivery))
+ if (flags.contains(PublishQoSFlags.QoSAtLeastOnceDelivery) ||
+ flags.contains(PublishQoSFlags.QoSExactlyOnceDelivery))
Some(PacketId(v.getShort & 0xFFFF))
else None
val payload = v.getByteString(l - (packetLen - v.len))
@@ -982,8 +999,8 @@ object MqttCodec {
}
val topicFilters = decodeTopicFilters(l - (packetLen - v.len),
Vector.empty)
val topicFiltersValid = topicFilters.nonEmpty &&
topicFilters.foldLeft(true) {
- case (true, (Right(_), tff)) if tff.underlying <
ControlPacketFlags.QoSReserved.underlying => true
- case _
=> false
+ case (true, (Right(_), tff)) if tff.underlying <
SubscribeQoSFlags.QoSReserved.underlying => true
+ case _
=> false
}
if (topicFiltersValid) {
Right(Subscribe(packetId,
diff --git a/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
b/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
index de5c3033e..ff2e2c5b6 100644
--- a/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
+++ b/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
@@ -35,6 +35,7 @@ import
org.apache.pekko.stream.connectors.mqtt.streaming.Event;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttSessionSettings;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubAck;
import org.apache.pekko.stream.connectors.mqtt.streaming.Publish;
+import org.apache.pekko.stream.connectors.mqtt.streaming.PublishQoSFlags;
import org.apache.pekko.stream.connectors.mqtt.streaming.SubAck;
import org.apache.pekko.stream.connectors.mqtt.streaming.Subscribe;
import
org.apache.pekko.stream.connectors.mqtt.streaming.javadsl.ActorMqttClientSession;
@@ -138,7 +139,7 @@ public class MqttFlowTest {
session.tell(
new Command<>(
new Publish(
- ControlPacketFlags.RETAIN() |
ControlPacketFlags.QoSAtLeastOnceDelivery(),
+ ControlPacketFlags.RETAIN() |
PublishQoSFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString("ohi"))));
// #run-streaming-flow
@@ -281,7 +282,7 @@ public class MqttFlowTest {
clientSession.tell(
new Command<>(
new Publish(
- ControlPacketFlags.RETAIN() |
ControlPacketFlags.QoSAtLeastOnceDelivery(),
+ ControlPacketFlags.RETAIN() |
PublishQoSFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString("ohi"))));
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
index 4b65b51b3..7bea10fdf 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
@@ -217,7 +217,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with
LogCapturing {
"encode/decode publish packets" in {
val bsb: ByteStringBuilder = ByteString.newBuilder
val packet = Publish(
- ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtMostOnceDelivery |
ControlPacketFlags.DUP,
+ ControlPacketFlags.RETAIN | PublishQoSFlags.QoSAtMostOnceDelivery |
ControlPacketFlags.DUP,
"some-topic-name",
ByteString("some-payload"))
val bytes = packet.encode(bsb, None).result()
@@ -235,7 +235,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with
LogCapturing {
"invalid QoS when decoding publish packets" in {
val bsb = ByteString.newBuilder
- .putByte((ControlPacketType.PUBLISH.underlying << 4 |
ControlPacketFlags.QoSReserved.underlying).toByte)
+ .putByte((ControlPacketType.PUBLISH.underlying << 4 |
PublishQoSFlags.QoSReserved.underlying).toByte)
.putByte(0)
bsb
.result()
@@ -260,7 +260,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with
LogCapturing {
bsb
.result()
.iterator
- .decodePublish(0, ControlPacketFlags.QoSAtLeastOnceDelivery) shouldBe
Left(MqttCodec.BufferUnderflow)
+ .decodePublish(0, PublishQoSFlags.QoSAtLeastOnceDelivery) shouldBe
Left(MqttCodec.BufferUnderflow)
}
"encode/decode publish ack packets" in {
@@ -314,8 +314,8 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with
LogCapturing {
"encode/decode subscribe packets" in {
val bsb: ByteStringBuilder = ByteString.newBuilder
val packet = Subscribe(
- List("some-head-topic" -> ControlPacketFlags.QoSExactlyOnceDelivery,
- "some-tail-topic" -> ControlPacketFlags.QoSExactlyOnceDelivery))
+ List("some-head-topic" -> SubscribeQoSFlags.QoSExactlyOnceDelivery,
+ "some-tail-topic" -> SubscribeQoSFlags.QoSExactlyOnceDelivery))
val bytes = packet.encode(bsb, PacketId(0)).result()
bytes.size shouldBe 40
bytes.iterator.decodeControlPacket(MaxPacketSize) shouldBe Right(packet)
@@ -332,14 +332,14 @@ class MqttCodecSpec extends AnyWordSpec with Matchers
with LogCapturing {
"bad subscribe message when decoding subscribe packets given bad QoS" in {
val bsb: ByteStringBuilder = ByteString.newBuilder
val packet = Subscribe(
- List("some-head-topic" -> ControlPacketFlags.QoSExactlyOnceDelivery,
- "some-tail-topic" -> ControlPacketFlags.QoSReserved))
+ List("some-head-topic" -> SubscribeQoSFlags.QoSExactlyOnceDelivery,
+ "some-tail-topic" -> SubscribeQoSFlags.QoSReserved))
val bytes = packet.encode(bsb, PacketId(1)).result()
bytes.iterator
.decodeControlPacket(MaxPacketSize) shouldBe Left(
BadSubscribeMessage(PacketId(1),
- List(Right("some-head-topic") ->
ControlPacketFlags.QoSExactlyOnceDelivery,
- Right("some-tail-topic") -> ControlPacketFlags.QoSReserved)))
+ List(Right("some-head-topic") ->
SubscribeQoSFlags.QoSExactlyOnceDelivery,
+ Right("some-tail-topic") -> SubscribeQoSFlags.QoSReserved)))
}
"bad subscribe message when decoding subscribe packets given no topics" in
{
@@ -358,7 +358,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with
LogCapturing {
"encode/decode sub ack packets" in {
val bsb: ByteStringBuilder = ByteString.newBuilder
val packet =
- SubAck(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery,
ControlPacketFlags.QoSExactlyOnceDelivery))
+ SubAck(PacketId(1), List(SubscribeQoSFlags.QoSExactlyOnceDelivery,
SubscribeQoSFlags.QoSExactlyOnceDelivery))
val bytes = packet.encode(bsb).result()
bytes.size shouldBe 6
bytes.iterator.decodeControlPacket(MaxPacketSize) shouldBe Right(packet)
@@ -366,11 +366,11 @@ class MqttCodecSpec extends AnyWordSpec with Matchers
with LogCapturing {
"regular sub ack message when decoding sub ack packets given failure QoS"
in {
val bsb: ByteStringBuilder = ByteString.newBuilder
- val packet = SubAck(PacketId(1),
List(ControlPacketFlags.QoSExactlyOnceDelivery, ControlPacketFlags.QoSFailure))
+ val packet = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSExactlyOnceDelivery, SubscribeQoSFlags.QoSFailure))
val bytes = packet.encode(bsb).result()
bytes.iterator
.decodeControlPacket(MaxPacketSize) shouldBe Right(
- SubAck(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery,
ControlPacketFlags.QoSFailure)))
+ SubAck(PacketId(1), List(SubscribeQoSFlags.QoSExactlyOnceDelivery,
SubscribeQoSFlags.QoSFailure)))
}
"underflow when decoding sub ack packets" in {
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
index ecb419eea..fa4d0ed1c 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
@@ -89,7 +89,7 @@ abstract class MqttFlowSpecBase(clientId: String, topic:
String, system: ActorSy
commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
commands.offer(Command(Subscribe(topic)))
session ! Command(
- Publish(ControlPacketFlags.RETAIN |
ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
+ Publish(ControlPacketFlags.RETAIN |
PublishQoSFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
// #run-streaming-flow
events.futureValue match {
@@ -107,9 +107,9 @@ abstract class MqttFlowSpecBase(clientId: String, topic:
String, system: ActorSy
}
"mqtt server flow" should {
- // Ignored due to ://github.com/akka/alpakka/issues/1549, possibly
- // fixed with https://github.com/akka/alpakka/pull/2189
- "receive a bidirectional connection and a subscription to a topic" ignore {
+ // Related to https://github.com/akka/alpakka/issues/1549, possibly
+ // fixed with https://github.com/akka/alpakka/pull/2189 and
https://github.com/akka/alpakka/pull/2969
+ "receive a bidirectional connection and a subscription to a topic" in
assertAllStagesStopped {
val host = "localhost"
@@ -181,7 +181,7 @@ abstract class MqttFlowSpecBase(clientId: String, topic:
String, system: ActorSy
commands.offer(Command(Connect(clientId, ConnectFlags.None)))
commands.offer(Command(Subscribe(topic)))
clientSession ! Command(
- Publish(ControlPacketFlags.RETAIN |
ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
+ Publish(ControlPacketFlags.RETAIN |
PublishQoSFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
events.futureValue match {
case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi")
@@ -196,4 +196,34 @@ abstract class MqttFlowSpecBase(clientId: String, topic:
String, system: ActorSy
commands.watchCompletion().foreach(_ => clientSession.shutdown())
}
}
+
+ "mqtt client" should {
+ Seq(SubscribeQoSFlags.QoSAtMostOnceDelivery,
+ SubscribeQoSFlags.QoSAtLeastOnceDelivery,
+ SubscribeQoSFlags.QoSExactlyOnceDelivery).foreach { qos =>
+ s"subscribe at QoS ${qos.underlying.toString} level" in
assertAllStagesStopped {
+ val id = qos.underlying.toString
+
+ val settings = MqttSessionSettings()
+ val session = ActorMqttClientSession(settings)
+ val conn = Tcp().outgoingConnection("localhost", 1883)
+ val mqttFlow = Mqtt.clientSessionFlow(session,
ByteString(id)).join(conn)
+ val (commands, events) = Source
+ .queue(10, OverflowStrategy.fail)
+ .via(mqttFlow)
+ .collect {
+ case Right(Event(p: SubAck, _)) => p
+ }
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ commands.offer(Command(Connect(id, ConnectFlags.CleanSession)))
+ commands.offer(Command(Subscribe(Seq(s"topic$id" -> qos))))
+
+ events.futureValue match {
+ case SubAck(_, returnCodes) => returnCodes.head.underlying shouldBe
qos.underlying
+ }
+ }
+ }
+ }
}
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
index 428f138cd..0a6385af7 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
@@ -86,7 +86,7 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
val publish = Publish("some-topic", ByteString("some-payload"))
@@ -172,7 +172,7 @@ class MqttSessionSpec
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
client.offer(Command(connect))
@@ -442,10 +442,10 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSAtMostOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSAtMostOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder, None).result()
client.offer(Command(connect))
@@ -492,10 +492,10 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
val pubAck = PubAck(PacketId(1))
val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
@@ -553,10 +553,10 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
val pubAck = PubAck(PacketId(1))
val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
@@ -630,13 +630,13 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribe1Bytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck1 = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck1 = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAck1Bytes = subAck1.encode(ByteString.newBuilder).result()
val subscribe2Bytes = subscribe.encode(ByteString.newBuilder,
PacketId(2)).result()
- val subAck2 = SubAck(PacketId(2),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck2 = SubAck(PacketId(2),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAck2Bytes = subAck2.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
val pubAck = PubAck(PacketId(1))
@@ -702,7 +702,7 @@ class MqttSessionSpec
val connAck = ConnAck(ConnAckFlags.None,
ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery |
ControlPacketFlags.DUP,
+ val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery |
ControlPacketFlags.DUP,
"some-topic",
ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
@@ -754,10 +754,10 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSExactlyOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSExactlyOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
val pubRec = PubRec(PacketId(1))
val pubRecBytes = pubRec.encode(ByteString.newBuilder).result()
@@ -813,7 +813,7 @@ class MqttSessionSpec
val connAck = ConnAck(ConnAckFlags.None,
ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSAtMostOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSAtMostOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder, None).result()
client.offer(Command(connect))
@@ -871,6 +871,55 @@ class MqttSessionSpec
client.watchCompletion().foreach(_ => session.shutdown())
}
+ "publish with QoS 1 and ignore a spurious PubRec, completing on PubAck" in
assertAllStagesStopped {
+ val session = ActorMqttClientSession(settings)
+
+ val server = TestProbe()
+ val pipeToServer = Flow[ByteString].mapAsync(1)(msg =>
server.ref.ask(msg).mapTo[ByteString])
+
+ val (client, result) =
+ Source
+ .queue(1, OverflowStrategy.fail)
+ .via(
+ Mqtt
+ .clientSessionFlow[String](session, ByteString("1"))
+ .join(pipeToServer))
+ .drop(1)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ val connect = Connect("some-client-id", ConnectFlags.None)
+ val connectBytes = connect.encode(ByteString.newBuilder).result()
+ val connAck = ConnAck(ConnAckFlags.None,
ConnAckReturnCode.ConnectionAccepted)
+ val connAckBytes = connAck.encode(ByteString.newBuilder).result()
+
+ val publish = Publish("some-topic", ByteString("some-payload"))
+ val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
+ val carry = "some-carry"
+ // A spurious PubRec for a QoS 1 publish must be ignored; only PubAck
should complete it.
+ val pubRec = PubRec(PacketId(1))
+ val pubRecBytes = pubRec.encode(ByteString.newBuilder).result()
+ val pubAck = PubAck(PacketId(1))
+ val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
+
+ client.offer(Command(connect))
+
+ server.expectMsg(connectBytes)
+ server.reply(connAckBytes)
+
+ session ! Command(publish, carry)
+
+ server.expectMsg(publishBytes)
+ // Reply with spurious PubRec immediately followed by the correct PubAck
in the same byte stream.
+ // The MQTT framer splits these into two frames: PubRec is ignored (QoS
1 doesn't use PUBREC),
+ // and PubAck correctly completes the QoS 1 publish.
+ server.reply(pubRecBytes ++ pubAckBytes)
+
+ result.futureValue shouldBe Right(Event(pubAck, Some(carry)))
+ client.complete()
+ client.watchCompletion().foreach(_ => session.shutdown())
+ }
+
"publish twice with a QoS of 1 so that the second is queued" in
assertAllStagesStopped {
val session = ActorMqttClientSession(settings)
@@ -1059,7 +1108,7 @@ class MqttSessionSpec
val connAck = ConnAck(ConnAckFlags.None,
ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
- val publish = Publish(ControlPacketFlags.QoSExactlyOnceDelivery,
"some-topic", ByteString("some-payload"))
+ val publish = Publish(PublishQoSFlags.QoSExactlyOnceDelivery,
"some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
val carry = "some-carry"
val pubRec = PubRec(PacketId(1))
@@ -1200,7 +1249,7 @@ class MqttSessionSpec
val subscribe = Subscribe("some-topic")
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
val unsubscribe = Unsubscribe("some-topic")
@@ -1328,7 +1377,7 @@ class MqttSessionSpec
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
@@ -1426,11 +1475,11 @@ class MqttSessionSpec
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val subscribe1Bytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val sub1Ack = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val sub1Ack = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val sub1AckBytes = sub1Ack.encode(ByteString.newBuilder).result()
val subscribe2Bytes = subscribe.encode(ByteString.newBuilder,
PacketId(2)).result()
- val sub2Ack = SubAck(PacketId(2),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val sub2Ack = SubAck(PacketId(2),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val sub2AckBytes = sub2Ack.encode(ByteString.newBuilder).result()
fromClientQueue.offer(connectBytes)
@@ -1503,7 +1552,7 @@ class MqttSessionSpec
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
val publish = Publish("some-topic", ByteString("some-payload"))
@@ -1771,7 +1820,7 @@ class MqttSessionSpec
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
@@ -1980,7 +2029,7 @@ class MqttSessionSpec
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAck = SubAck(PacketId(1),
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
val publish = Publish("some-topic", ByteString("some-payload"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]