This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 68449a0cd Port alpakka PRs #2969 and #2971: fix MQTT QoS encoding and 
UnsubAck decoding (#1594)
68449a0cd is described below

commit 68449a0cdd24879fc0887099746801039850e5da
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Apr 28 09:31:44 2026 +0200

    Port alpakka PRs #2969 and #2971: fix MQTT QoS encoding and UnsubAck 
decoding (#1594)
    
    * Port alpakka PRs #2969 and #2971: fix MQTT QoS encoding and UnsubAck 
decoding
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/0bc4a6e9-bdec-4131-81ed-f4d63b4c4806
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix test assertion in subscribe QoS test
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/0bc4a6e9-bdec-4131-81ed-f4d63b4c4806
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * mima
    
    * Update qos.backwards.excludes
    
    * Re-ignore flaky server flow test - race condition not fully resolved
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/589f1e14-d041-45c4-88bc-044468037a4c
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix race condition: stash PublishReceivedLocally until subscription is 
registered
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/41ef4746-8bce-487e-ae91-b7c367374fae
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Potential fix for pull request finding
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    * Bound pre-subscription PublishReceivedLocally stash by 
serverSendBufferSize
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/b0eac44e-5a32-46cc-a830-6f44fe48abde
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update assertion in MqttFlowSpec for subscription QoS
    
    * Fix publishUnacknowledged PubRec guard: QoSAtMostOnceDelivery → 
QoSExactlyOnceDelivery; add regression test
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/b5f9765c-4beb-4d69-823b-0283a9ef9257
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .../2.0.x.backward.excludes/qos.backwards.excludes | 24 ++++++
 .../mqtt/streaming/impl/ClientState.scala          |  4 +-
 .../mqtt/streaming/impl/RequestState.scala         |  8 +-
 .../mqtt/streaming/impl/ServerState.scala          | 22 +++--
 .../stream/connectors/mqtt/streaming/model.scala   | 49 +++++++----
 .../src/test/java/docs/javadsl/MqttFlowTest.java   |  5 +-
 .../test/scala/docs/scaladsl/MqttCodecSpec.scala   | 24 +++---
 .../test/scala/docs/scaladsl/MqttFlowSpec.scala    | 40 +++++++--
 .../test/scala/docs/scaladsl/MqttSessionSpec.scala | 95 ++++++++++++++++------
 9 files changed, 201 insertions(+), 70 deletions(-)

diff --git 
a/mqtt-streaming/src/main/mima-filters/2.0.x.backward.excludes/qos.backwards.excludes
 
b/mqtt-streaming/src/main/mima-filters/2.0.x.backward.excludes/qos.backwards.excludes
new file mode 100644
index 000000000..6c5128293
--- /dev/null
+++ 
b/mqtt-streaming/src/main/mima-filters/2.0.x.backward.excludes/qos.backwards.excludes
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# fix MQTT QoS encoding and UnsubAck decoding 
(https://github.com/apache/pekko-connectors/pull/1594)
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSFailure")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSReserved")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSExactlyOnceDelivery")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSAtLeastOnceDelivery")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.QoSAtMostOnceDelivery")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.ReservedUnsubAck")
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
index ac3c90fb9..6e819657e 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
@@ -428,7 +428,7 @@ import scala.util.{ Either, Failure, Success }
             serverConnected(data, resetPingReqTimer = true)
 
           case (_, PublishReceivedFromRemote(_, publish, local))
-              if (publish.flags & ControlPacketFlags.QoSReserved).underlying 
== 0 =>
+              if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0 
=>
             local.success(Consumer.ForwardPublish)
             serverConnected(data, resetPingReqTimer = false)
 
@@ -479,7 +479,7 @@ import scala.util.{ Either, Failure, Success }
             }
 
           case (context, PublishReceivedLocally(publish, _))
-              if (publish.flags & ControlPacketFlags.QoSReserved).underlying 
== 0 =>
+              if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0 
=>
             QueueOfferState.waitForQueueOfferCompleted(
               data.remote.offer(ForwardPublish(publish, None)),
               result => QueueOfferCompleted(ByteString.empty, result.toEither),
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
index 0081f9bec..67a7eb7df 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
@@ -158,12 +158,12 @@ import scala.util.{ Either, Failure, Success }
       Behaviors
         .receive[Event] {
           case (_, PubAckReceivedFromRemote(local))
-              if 
data.publish.flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) =>
+              if 
data.publish.flags.contains(PublishQoSFlags.QoSAtLeastOnceDelivery) =>
             local.success(ForwardPubAck(data.publishData))
             Behaviors.stopped
 
           case (_, PubRecReceivedFromRemote(local))
-              if 
data.publish.flags.contains(ControlPacketFlags.QoSAtMostOnceDelivery) =>
+              if 
data.publish.flags.contains(PublishQoSFlags.QoSExactlyOnceDelivery) =>
             local.success(ForwardPubRec(data.publishData))
             timer.cancel(ReceivePubackrec)
             publishAcknowledged(data)
@@ -315,10 +315,10 @@ import scala.util.{ Either, Failure, Success }
     timer.startSingleTimer(ReceivePubackrel, ReceivePubAckRecTimeout, 
data.settings.consumerPubAckRecTimeout)
     Behaviors
       .receiveMessagePartial[Event] {
-        case PubAckReceivedLocally(remote) if 
data.publish.flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) =>
+        case PubAckReceivedLocally(remote) if 
data.publish.flags.contains(PublishQoSFlags.QoSAtLeastOnceDelivery) =>
           remote.success(ForwardPubAck)
           Behaviors.stopped
-        case PubRecReceivedLocally(remote) if 
data.publish.flags.contains(ControlPacketFlags.QoSExactlyOnceDelivery) =>
+        case PubRecReceivedLocally(remote) if 
data.publish.flags.contains(PublishQoSFlags.QoSExactlyOnceDelivery) =>
           remote.success(ForwardPubRec)
           timer.cancel(ReceivePubackrel)
           consumeReceived(data)
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
index 107ce2cbd..fb6a0366f 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala
@@ -486,10 +486,14 @@ import scala.util.{ Failure, Success }
                   data.settings)))
             subscribed.future.foreach(_ => context.self ! 
Subscribed(subscribe))(context.executionContext)
             clientConnected(data)
-          case (_, Subscribed(subscribe)) =>
-            clientConnected(
-              data.copy(
-                publishers = data.publishers ++ 
subscribe.topicFilters.map(_._1)))
+          case (context, Subscribed(subscribe)) =>
+            val newPublishers = data.publishers ++ 
subscribe.topicFilters.map(_._1)
+            val (toProcess, remaining) = 
data.pendingLocalPublications.partition { case (topic, _) =>
+              !data.publishers.exists(Topics.filter(_, topic)) &&
+              newPublishers.exists(Topics.filter(_, topic))
+            }
+            toProcess.foreach { case (_, prl) => context.self ! prl }
+            clientConnected(data.copy(publishers = newPublishers, 
pendingLocalPublications = remaining))
           case (context, UnsubscribeReceivedFromRemote(unsubscribe, local)) =>
             val unsubscribed = Promise[Done]()
             context.watch(
@@ -505,7 +509,7 @@ import scala.util.{ Failure, Success }
           case (_, Unsubscribed(unsubscribe)) =>
             clientConnected(data.copy(publishers = data.publishers -- 
unsubscribe.topicFilters))
           case (_, PublishReceivedFromRemote(publish, local))
-              if (publish.flags & ControlPacketFlags.QoSReserved).underlying 
== 0 =>
+              if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0 
=>
             local.success(Consumer.ForwardPublish)
             clientConnected(data)
           case (context, prfr @ PublishReceivedFromRemote(publish @ Publish(_, 
topicName, Some(packetId), _), local)) =>
@@ -552,7 +556,7 @@ import scala.util.{ Failure, Success }
               clientConnected(data.copy(activeConsumers = data.activeConsumers 
- topicName))
             }
           case (context, PublishReceivedLocally(publish, _))
-              if (publish.flags & ControlPacketFlags.QoSReserved).underlying 
== 0 &&
+              if (publish.flags & PublishQoSFlags.QoSReserved).underlying == 0 
&&
               data.publishers.exists(Topics.filter(_, publish.topicName)) =>
             QueueOfferState.waitForQueueOfferCompleted(
               data.remote
@@ -579,6 +583,12 @@ import scala.util.{ Failure, Success }
               clientConnected(
                 data.copy(pendingLocalPublications = 
data.pendingLocalPublications :+ (publish.topicName -> prl)))
             }
+          case (_, prl @ PublishReceivedLocally(publish, _))
+              if data.pendingLocalPublications.size < 
data.settings.serverSendBufferSize =>
+            // Topic not yet subscribed - stash until subscription arrives via 
Subscribed event.
+            // Bounded by serverSendBufferSize to prevent unbounded growth; 
excess publishes are dropped.
+            clientConnected(
+              data.copy(pendingLocalPublications = 
data.pendingLocalPublications :+ (publish.topicName -> prl)))
           case (context, ProducerFree(topicName)) =>
             val i = data.pendingLocalPublications.indexWhere(_._1 == topicName)
             if (i >= 0) {
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
index 2bdee1c89..4e1d37175 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala
@@ -33,7 +33,7 @@ import scala.jdk.OptionConverters._
 
 /**
  * 2.2.1 MQTT Control Packet type
- * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
+ * 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_-
  */
 object ControlPacketType {
   val Reserved1 = ControlPacketType(0)
@@ -57,8 +57,8 @@ object ControlPacketType {
 final case class ControlPacketType(underlying: Int) extends AnyVal
 
 /**
- * 2.2.2 Flags
- * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
+ * 3.3.1 Publish message header flags
+ * 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
  */
 object ControlPacketFlags {
   val None = ControlPacketFlags(0)
@@ -66,14 +66,31 @@ object ControlPacketFlags {
   val ReservedPubRel = ControlPacketFlags(1 << 1)
   val ReservedSubscribe = ControlPacketFlags(1 << 1)
   val ReservedUnsubscribe = ControlPacketFlags(1 << 1)
-  val ReservedUnsubAck = ControlPacketFlags(1 << 1)
   val DUP = ControlPacketFlags(1 << 3)
+  val RETAIN = ControlPacketFlags(1)
+}
+
+/**
+ * 3.3.1 Publish QoS flags
+ * 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.11_-
+ */
+object PublishQoSFlags {
   val QoSAtMostOnceDelivery = ControlPacketFlags(0)
   val QoSAtLeastOnceDelivery = ControlPacketFlags(1 << 1)
   val QoSExactlyOnceDelivery = ControlPacketFlags(2 << 1)
   val QoSReserved = ControlPacketFlags(3 << 1)
-  val QoSFailure = ControlPacketFlags(1 << 7)
-  val RETAIN = ControlPacketFlags(1)
+}
+
+/**
+ * 3.8.3 Subscribe payload QoS
+ * 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.26_-
+ */
+object SubscribeQoSFlags {
+  val QoSAtMostOnceDelivery = ControlPacketFlags(0)
+  val QoSAtLeastOnceDelivery = ControlPacketFlags(1)
+  val QoSExactlyOnceDelivery = ControlPacketFlags(2)
+  val QoSReserved = ControlPacketFlags(3)
+  val QoSFailure = ControlPacketFlags(0x80)
 }
 
 @InternalApi
@@ -304,7 +321,7 @@ final case class Publish(override val flags: 
ControlPacketFlags,
    * Conveniently create a publish message with at least once delivery
    */
   def this(topicName: String, payload: ByteString) =
-    this(ControlPacketFlags.QoSAtLeastOnceDelivery, topicName, 
Some(PacketId(0)), payload)
+    this(PublishQoSFlags.QoSAtLeastOnceDelivery, topicName, Some(PacketId(0)), 
payload)
 
   override def toString: String =
     
s"""Publish(flags:$flags,topicName:$topicName,packetId:$packetId,payload:${payload.size}b)"""
@@ -382,7 +399,7 @@ final case class Subscribe @InternalApi private[streaming] 
(packetId: PacketId,
    * A convenience for subscribing to a single topic with at-least-once 
semantics
    */
   def this(topicFilter: String) =
-    this(PacketId(0), List(topicFilter -> 
ControlPacketFlags.QoSAtLeastOnceDelivery))
+    this(PacketId(0), List(topicFilter -> 
SubscribeQoSFlags.QoSAtLeastOnceDelivery))
 }
 
 /**
@@ -450,10 +467,10 @@ final case class Unsubscribe @InternalApi 
private[streaming] (packetId: PacketId
 
 /**
  * 3.11 UNSUBACK – Unsubscribe acknowledgement
- * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
+ * 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077
  */
 final case class UnsubAck(packetId: PacketId)
-    extends ControlPacket(ControlPacketType.UNSUBACK, 
ControlPacketFlags.ReservedUnsubAck)
+    extends ControlPacket(ControlPacketType.UNSUBACK, 
ControlPacketFlags.ReservedGeneral)
 
 /**
  * 3.12 PINGREQ – PING request
@@ -816,7 +833,7 @@ object MqttCodec {
                 v.decodeSubAck(l)
               case (ControlPacketType.UNSUBSCRIBE, 
ControlPacketFlags.ReservedUnsubscribe) =>
                 v.decodeUnsubscribe(l)
-              case (ControlPacketType.UNSUBACK, 
ControlPacketFlags.ReservedUnsubAck) =>
+              case (ControlPacketType.UNSUBACK, 
ControlPacketFlags.ReservedGeneral) =>
                 v.decodeUnsubAck()
               case (ControlPacketType.PINGREQ, 
ControlPacketFlags.ReservedGeneral) =>
                 Right(PingReq)
@@ -905,12 +922,12 @@ object MqttCodec {
     // 3.3 PUBLISH – Publish message
     def decodePublish(l: Int, flags: ControlPacketFlags): Either[DecodeError, 
Publish] =
       try {
-        if (!flags.contains(ControlPacketFlags.QoSReserved)) {
+        if (!flags.contains(PublishQoSFlags.QoSReserved)) {
           val packetLen = v.len
           val topicName = v.decodeString()
           val packetId =
-            if (flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) ||
-              flags.contains(ControlPacketFlags.QoSExactlyOnceDelivery))
+            if (flags.contains(PublishQoSFlags.QoSAtLeastOnceDelivery) ||
+              flags.contains(PublishQoSFlags.QoSExactlyOnceDelivery))
               Some(PacketId(v.getShort & 0xFFFF))
             else None
           val payload = v.getByteString(l - (packetLen - v.len))
@@ -982,8 +999,8 @@ object MqttCodec {
           }
         val topicFilters = decodeTopicFilters(l - (packetLen - v.len), 
Vector.empty)
         val topicFiltersValid = topicFilters.nonEmpty && 
topicFilters.foldLeft(true) {
-          case (true, (Right(_), tff)) if tff.underlying < 
ControlPacketFlags.QoSReserved.underlying => true
-          case _                                                               
                      => false
+          case (true, (Right(_), tff)) if tff.underlying < 
SubscribeQoSFlags.QoSReserved.underlying => true
+          case _                                                               
                     => false
         }
         if (topicFiltersValid) {
           Right(Subscribe(packetId,
diff --git a/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java 
b/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
index de5c3033e..ff2e2c5b6 100644
--- a/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
+++ b/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pekko.stream.connectors.mqtt.streaming.Event;
 import org.apache.pekko.stream.connectors.mqtt.streaming.MqttSessionSettings;
 import org.apache.pekko.stream.connectors.mqtt.streaming.PubAck;
 import org.apache.pekko.stream.connectors.mqtt.streaming.Publish;
+import org.apache.pekko.stream.connectors.mqtt.streaming.PublishQoSFlags;
 import org.apache.pekko.stream.connectors.mqtt.streaming.SubAck;
 import org.apache.pekko.stream.connectors.mqtt.streaming.Subscribe;
 import 
org.apache.pekko.stream.connectors.mqtt.streaming.javadsl.ActorMqttClientSession;
@@ -138,7 +139,7 @@ public class MqttFlowTest {
     session.tell(
         new Command<>(
             new Publish(
-                ControlPacketFlags.RETAIN() | 
ControlPacketFlags.QoSAtLeastOnceDelivery(),
+                ControlPacketFlags.RETAIN() | 
PublishQoSFlags.QoSAtLeastOnceDelivery(),
                 topic,
                 ByteString.fromString("ohi"))));
     // #run-streaming-flow
@@ -281,7 +282,7 @@ public class MqttFlowTest {
     clientSession.tell(
         new Command<>(
             new Publish(
-                ControlPacketFlags.RETAIN() | 
ControlPacketFlags.QoSAtLeastOnceDelivery(),
+                ControlPacketFlags.RETAIN() | 
PublishQoSFlags.QoSAtLeastOnceDelivery(),
                 topic,
                 ByteString.fromString("ohi"))));
 
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala 
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
index 4b65b51b3..7bea10fdf 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala
@@ -217,7 +217,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with 
LogCapturing {
     "encode/decode publish packets" in {
       val bsb: ByteStringBuilder = ByteString.newBuilder
       val packet = Publish(
-        ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtMostOnceDelivery | 
ControlPacketFlags.DUP,
+        ControlPacketFlags.RETAIN | PublishQoSFlags.QoSAtMostOnceDelivery | 
ControlPacketFlags.DUP,
         "some-topic-name",
         ByteString("some-payload"))
       val bytes = packet.encode(bsb, None).result()
@@ -235,7 +235,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with 
LogCapturing {
 
     "invalid QoS when decoding publish packets" in {
       val bsb = ByteString.newBuilder
-        .putByte((ControlPacketType.PUBLISH.underlying << 4 | 
ControlPacketFlags.QoSReserved.underlying).toByte)
+        .putByte((ControlPacketType.PUBLISH.underlying << 4 | 
PublishQoSFlags.QoSReserved.underlying).toByte)
         .putByte(0)
       bsb
         .result()
@@ -260,7 +260,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with 
LogCapturing {
       bsb
         .result()
         .iterator
-        .decodePublish(0, ControlPacketFlags.QoSAtLeastOnceDelivery) shouldBe 
Left(MqttCodec.BufferUnderflow)
+        .decodePublish(0, PublishQoSFlags.QoSAtLeastOnceDelivery) shouldBe 
Left(MqttCodec.BufferUnderflow)
     }
 
     "encode/decode publish ack packets" in {
@@ -314,8 +314,8 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with 
LogCapturing {
     "encode/decode subscribe packets" in {
       val bsb: ByteStringBuilder = ByteString.newBuilder
       val packet = Subscribe(
-        List("some-head-topic" -> ControlPacketFlags.QoSExactlyOnceDelivery,
-          "some-tail-topic" -> ControlPacketFlags.QoSExactlyOnceDelivery))
+        List("some-head-topic" -> SubscribeQoSFlags.QoSExactlyOnceDelivery,
+          "some-tail-topic" -> SubscribeQoSFlags.QoSExactlyOnceDelivery))
       val bytes = packet.encode(bsb, PacketId(0)).result()
       bytes.size shouldBe 40
       bytes.iterator.decodeControlPacket(MaxPacketSize) shouldBe Right(packet)
@@ -332,14 +332,14 @@ class MqttCodecSpec extends AnyWordSpec with Matchers 
with LogCapturing {
     "bad subscribe message when decoding subscribe packets given bad QoS" in {
       val bsb: ByteStringBuilder = ByteString.newBuilder
       val packet = Subscribe(
-        List("some-head-topic" -> ControlPacketFlags.QoSExactlyOnceDelivery,
-          "some-tail-topic" -> ControlPacketFlags.QoSReserved))
+        List("some-head-topic" -> SubscribeQoSFlags.QoSExactlyOnceDelivery,
+          "some-tail-topic" -> SubscribeQoSFlags.QoSReserved))
       val bytes = packet.encode(bsb, PacketId(1)).result()
       bytes.iterator
         .decodeControlPacket(MaxPacketSize) shouldBe Left(
         BadSubscribeMessage(PacketId(1),
-          List(Right("some-head-topic") -> 
ControlPacketFlags.QoSExactlyOnceDelivery,
-            Right("some-tail-topic") -> ControlPacketFlags.QoSReserved)))
+          List(Right("some-head-topic") -> 
SubscribeQoSFlags.QoSExactlyOnceDelivery,
+            Right("some-tail-topic") -> SubscribeQoSFlags.QoSReserved)))
     }
 
     "bad subscribe message when decoding subscribe packets given no topics" in 
{
@@ -358,7 +358,7 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with 
LogCapturing {
     "encode/decode sub ack packets" in {
       val bsb: ByteStringBuilder = ByteString.newBuilder
       val packet =
-        SubAck(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery, 
ControlPacketFlags.QoSExactlyOnceDelivery))
+        SubAck(PacketId(1), List(SubscribeQoSFlags.QoSExactlyOnceDelivery, 
SubscribeQoSFlags.QoSExactlyOnceDelivery))
       val bytes = packet.encode(bsb).result()
       bytes.size shouldBe 6
       bytes.iterator.decodeControlPacket(MaxPacketSize) shouldBe Right(packet)
@@ -366,11 +366,11 @@ class MqttCodecSpec extends AnyWordSpec with Matchers 
with LogCapturing {
 
     "regular sub ack message when decoding sub ack packets given failure QoS" 
in {
       val bsb: ByteStringBuilder = ByteString.newBuilder
-      val packet = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSExactlyOnceDelivery, ControlPacketFlags.QoSFailure))
+      val packet = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSExactlyOnceDelivery, SubscribeQoSFlags.QoSFailure))
       val bytes = packet.encode(bsb).result()
       bytes.iterator
         .decodeControlPacket(MaxPacketSize) shouldBe Right(
-        SubAck(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery, 
ControlPacketFlags.QoSFailure)))
+        SubAck(PacketId(1), List(SubscribeQoSFlags.QoSExactlyOnceDelivery, 
SubscribeQoSFlags.QoSFailure)))
     }
 
     "underflow when decoding sub ack packets" in {
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala 
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
index ecb419eea..fa4d0ed1c 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
@@ -89,7 +89,7 @@ abstract class MqttFlowSpecBase(clientId: String, topic: 
String, system: ActorSy
       commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
       commands.offer(Command(Subscribe(topic)))
       session ! Command(
-        Publish(ControlPacketFlags.RETAIN | 
ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
+        Publish(ControlPacketFlags.RETAIN | 
PublishQoSFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
       // #run-streaming-flow
 
       events.futureValue match {
@@ -107,9 +107,9 @@ abstract class MqttFlowSpecBase(clientId: String, topic: 
String, system: ActorSy
   }
 
   "mqtt server flow" should {
-    // Ignored due to ://github.com/akka/alpakka/issues/1549, possibly
-    // fixed with https://github.com/akka/alpakka/pull/2189
-    "receive a bidirectional connection and a subscription to a topic" ignore {
+    // Related to https://github.com/akka/alpakka/issues/1549, possibly
+    // fixed with https://github.com/akka/alpakka/pull/2189 and 
https://github.com/akka/alpakka/pull/2969
+    "receive a bidirectional connection and a subscription to a topic" in 
assertAllStagesStopped {
 
       val host = "localhost"
 
@@ -181,7 +181,7 @@ abstract class MqttFlowSpecBase(clientId: String, topic: 
String, system: ActorSy
       commands.offer(Command(Connect(clientId, ConnectFlags.None)))
       commands.offer(Command(Subscribe(topic)))
       clientSession ! Command(
-        Publish(ControlPacketFlags.RETAIN | 
ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
+        Publish(ControlPacketFlags.RETAIN | 
PublishQoSFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")))
 
       events.futureValue match {
         case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi")
@@ -196,4 +196,34 @@ abstract class MqttFlowSpecBase(clientId: String, topic: 
String, system: ActorSy
       commands.watchCompletion().foreach(_ => clientSession.shutdown())
     }
   }
+
+  "mqtt client" should {
+    Seq(SubscribeQoSFlags.QoSAtMostOnceDelivery,
+      SubscribeQoSFlags.QoSAtLeastOnceDelivery,
+      SubscribeQoSFlags.QoSExactlyOnceDelivery).foreach { qos =>
+      s"subscribe at QoS ${qos.underlying.toString} level" in 
assertAllStagesStopped {
+        val id = qos.underlying.toString
+
+        val settings = MqttSessionSettings()
+        val session = ActorMqttClientSession(settings)
+        val conn = Tcp().outgoingConnection("localhost", 1883)
+        val mqttFlow = Mqtt.clientSessionFlow(session, 
ByteString(id)).join(conn)
+        val (commands, events) = Source
+          .queue(10, OverflowStrategy.fail)
+          .via(mqttFlow)
+          .collect {
+            case Right(Event(p: SubAck, _)) => p
+          }
+          .toMat(Sink.head)(Keep.both)
+          .run()
+
+        commands.offer(Command(Connect(id, ConnectFlags.CleanSession)))
+        commands.offer(Command(Subscribe(Seq(s"topic$id" -> qos))))
+
+        events.futureValue match {
+          case SubAck(_, returnCodes) => returnCodes.head.underlying shouldBe 
qos.underlying
+        }
+      }
+    }
+  }
 }
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala 
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
index 428f138cd..0a6385af7 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
@@ -86,7 +86,7 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       val publish = Publish("some-topic", ByteString("some-payload"))
@@ -172,7 +172,7 @@ class MqttSessionSpec
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       client.offer(Command(connect))
@@ -442,10 +442,10 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSAtMostOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSAtMostOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, None).result()
 
       client.offer(Command(connect))
@@ -492,10 +492,10 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
       val pubAck = PubAck(PacketId(1))
       val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
@@ -553,10 +553,10 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
       val pubAck = PubAck(PacketId(1))
       val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
@@ -630,13 +630,13 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribe1Bytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck1 = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck1 = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAck1Bytes = subAck1.encode(ByteString.newBuilder).result()
       val subscribe2Bytes = subscribe.encode(ByteString.newBuilder, 
PacketId(2)).result()
-      val subAck2 = SubAck(PacketId(2), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck2 = SubAck(PacketId(2), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAck2Bytes = subAck2.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
       val pubAck = PubAck(PacketId(1))
 
@@ -702,7 +702,7 @@ class MqttSessionSpec
       val connAck = ConnAck(ConnAckFlags.None, 
ConnAckReturnCode.ConnectionAccepted)
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSAtLeastOnceDelivery | 
ControlPacketFlags.DUP,
+      val publish = Publish(PublishQoSFlags.QoSAtLeastOnceDelivery | 
ControlPacketFlags.DUP,
         "some-topic",
         ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
@@ -754,10 +754,10 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSExactlyOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSExactlyOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
       val pubRec = PubRec(PacketId(1))
       val pubRecBytes = pubRec.encode(ByteString.newBuilder).result()
@@ -813,7 +813,7 @@ class MqttSessionSpec
       val connAck = ConnAck(ConnAckFlags.None, 
ConnAckReturnCode.ConnectionAccepted)
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSAtMostOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSAtMostOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, None).result()
 
       client.offer(Command(connect))
@@ -871,6 +871,55 @@ class MqttSessionSpec
       client.watchCompletion().foreach(_ => session.shutdown())
     }
 
+    "publish with QoS 1 and ignore a spurious PubRec, completing on PubAck" in 
assertAllStagesStopped {
+      val session = ActorMqttClientSession(settings)
+
+      val server = TestProbe()
+      val pipeToServer = Flow[ByteString].mapAsync(1)(msg => 
server.ref.ask(msg).mapTo[ByteString])
+
+      val (client, result) =
+        Source
+          .queue(1, OverflowStrategy.fail)
+          .via(
+            Mqtt
+              .clientSessionFlow[String](session, ByteString("1"))
+              .join(pipeToServer))
+          .drop(1)
+          .toMat(Sink.head)(Keep.both)
+          .run()
+
+      val connect = Connect("some-client-id", ConnectFlags.None)
+      val connectBytes = connect.encode(ByteString.newBuilder).result()
+      val connAck = ConnAck(ConnAckFlags.None, 
ConnAckReturnCode.ConnectionAccepted)
+      val connAckBytes = connAck.encode(ByteString.newBuilder).result()
+
+      val publish = Publish("some-topic", ByteString("some-payload"))
+      val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
+      val carry = "some-carry"
+      // A spurious PubRec for a QoS 1 publish must be ignored; only PubAck 
should complete it.
+      val pubRec = PubRec(PacketId(1))
+      val pubRecBytes = pubRec.encode(ByteString.newBuilder).result()
+      val pubAck = PubAck(PacketId(1))
+      val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
+
+      client.offer(Command(connect))
+
+      server.expectMsg(connectBytes)
+      server.reply(connAckBytes)
+
+      session ! Command(publish, carry)
+
+      server.expectMsg(publishBytes)
+      // Reply with spurious PubRec immediately followed by the correct PubAck 
in the same byte stream.
+      // The MQTT framer splits these into two frames: PubRec is ignored (QoS 
1 doesn't use PUBREC),
+      // and PubAck correctly completes the QoS 1 publish.
+      server.reply(pubRecBytes ++ pubAckBytes)
+
+      result.futureValue shouldBe Right(Event(pubAck, Some(carry)))
+      client.complete()
+      client.watchCompletion().foreach(_ => session.shutdown())
+    }
+
     "publish twice with a QoS of 1 so that the second is queued" in 
assertAllStagesStopped {
       val session = ActorMqttClientSession(settings)
 
@@ -1059,7 +1108,7 @@ class MqttSessionSpec
       val connAck = ConnAck(ConnAckFlags.None, 
ConnAckReturnCode.ConnectionAccepted)
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
-      val publish = Publish(ControlPacketFlags.QoSExactlyOnceDelivery, 
"some-topic", ByteString("some-payload"))
+      val publish = Publish(PublishQoSFlags.QoSExactlyOnceDelivery, 
"some-topic", ByteString("some-payload"))
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
       val carry = "some-carry"
       val pubRec = PubRec(PacketId(1))
@@ -1200,7 +1249,7 @@ class MqttSessionSpec
 
       val subscribe = Subscribe("some-topic")
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       val unsubscribe = Unsubscribe("some-topic")
@@ -1328,7 +1377,7 @@ class MqttSessionSpec
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
@@ -1426,11 +1475,11 @@ class MqttSessionSpec
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
       val subscribe1Bytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val sub1Ack = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val sub1Ack = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val sub1AckBytes = sub1Ack.encode(ByteString.newBuilder).result()
 
       val subscribe2Bytes = subscribe.encode(ByteString.newBuilder, 
PacketId(2)).result()
-      val sub2Ack = SubAck(PacketId(2), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val sub2Ack = SubAck(PacketId(2), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val sub2AckBytes = sub2Ack.encode(ByteString.newBuilder).result()
 
       fromClientQueue.offer(connectBytes)
@@ -1503,7 +1552,7 @@ class MqttSessionSpec
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       val publish = Publish("some-topic", ByteString("some-payload"))
@@ -1771,7 +1820,7 @@ class MqttSessionSpec
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
@@ -1980,7 +2029,7 @@ class MqttSessionSpec
       val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
       val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+      val subAck = SubAck(PacketId(1), 
List(SubscribeQoSFlags.QoSAtLeastOnceDelivery))
       val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
       val publish = Publish("some-topic", ByteString("some-payload"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to