sndnv commented on code in PR #1035: URL: https://github.com/apache/pekko-connectors/pull/1035#discussion_r2039557728
########## mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/settings.scala: ########## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com> + */ + +package org.apache.pekko.stream.connectors.mqttv5 + +import java.nio.charset.StandardCharsets +import java.util.Properties + +import scala.collection.immutable +import scala.concurrent.duration._ + +import org.apache.pekko +import org.apache.pekko.japi.Pair +import org.apache.pekko.util.JavaDurationConverters._ +import org.apache.pekko.util.ccompat.JavaConverters._ +import org.eclipse.paho.mqttv5.client.MqttClientPersistence +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions +import org.eclipse.paho.mqttv5.common.packet.MqttProperties +import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage } + +/** + * Quality of Service constants as defined in + * [[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901234]] + */ +sealed abstract class MqttQoS { + def value: Int +} + +/** + * Quality of Service constants as defined in + * [[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901234]] + */ +object MqttQoS { + + /** + * Quality of Service 0 - indicates that a message should be delivered at most once (zero or one times). The message + * will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should + * only be used for messages which are not valuable. + */ + object AtMostOnce extends MqttQoS { + val value: Int = 0 + } + + /** + * Quality of Service 1 - indicates that a message should be delivered at least once (one or more times). The message + * can only be delivered safely if it can be persisted, so the application must supply a means of persistence using + * [[MqttConnectionSettings]]. If a persistence mechanism is not specified, the message will not be delivered in the + * event of a client failure. The message will be acknowledged across the network. + */ + object AtLeastOnce extends MqttQoS { + val value: Int = 1 + } + + /** + * Quality of Service 2 - indicates that a message should be delivered once. The message will be persisted to disk, + * and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely + * if it can be persisted, so the application must supply a means of persistence using [[MqttConnectionSettings]]. + * If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. + */ + object ExactlyOnce extends MqttQoS { + val value: Int = 2 + } + + /** + * Java API + * + * Quality of Service 0 - indicates that a message should be delivered at most once (zero or one times). The message + * will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should + * only be used for messages which are not valuable. + */ + def atMostOnce: MqttQoS = AtMostOnce + + /** + * Java API + * + * Quality of Service 1 - indicates that a message should be delivered at least once (one or more times). The message + * can only be delivered safely if it can be persisted, so the application must supply a means of persistence using + * [[MqttConnectionSettings]]. If a persistence mechanism is not specified, the message will not be delivered in the + * event of a client failure. The message will be acknowledged across the network. + */ + def atLeastOnce: MqttQoS = AtLeastOnce + + /** + * Java API + * + * Quality of Service 2 - indicates that a message should be delivered once. The message will be persisted to disk, + * and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely + * if it can be persisted, so the application must supply a means of persistence using [[MqttConnectionSettings]]. + * If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. + */ + def exactlyOnce: MqttQoS = ExactlyOnce +} + +/** + * The mapping of topics to subscribe to and the requested Quality of Service ([[MqttQoS]]) per topic. + */ +final class MqttSubscriptions private ( + val subscriptions: Map[String, MqttQoS]) { + + /** Scala API */ + def withSubscriptions(subscriptions: Map[String, MqttQoS]): MqttSubscriptions = + new MqttSubscriptions(subscriptions) + + /** Java API */ + def withSubscriptions(subscriptions: java.util.List[pekko.japi.Pair[String, MqttQoS]]): MqttSubscriptions = + new MqttSubscriptions(subscriptions.asScala.map(_.toScala).toMap) + + /** Add this subscription to the map of subscriptions configured already. */ + def addSubscription(topic: String, qos: MqttQoS): MqttSubscriptions = + new MqttSubscriptions(this.subscriptions.updated(topic, qos)) +} + +/** + * The mapping of topics to subscribe to and the requested Quality of Service ([[MqttQoS]]) per topic. + */ +object MqttSubscriptions { + val empty = new MqttSubscriptions(Map.empty) + + /** Scala API */ + def apply(subscriptions: Map[String, MqttQoS]): MqttSubscriptions = + new MqttSubscriptions(subscriptions) + + /** Scala API */ + def apply(topic: String, qos: MqttQoS): MqttSubscriptions = + new MqttSubscriptions(Map(topic -> qos)) + + /** Scala API */ + def apply(subscription: (String, MqttQoS)): MqttSubscriptions = + new MqttSubscriptions(Map(subscription)) + + /** Java API */ + def create(subscriptions: java.util.List[pekko.japi.Pair[String, MqttQoS]]): MqttSubscriptions = + new MqttSubscriptions(subscriptions.asScala.map(_.toScala).toMap) + + /** Java API */ + def create(topic: String, qos: MqttQoS): MqttSubscriptions = + new MqttSubscriptions(Map(topic -> qos)) + +} + +private[mqttv5] final case class CleanStartSettings( Review Comment: [info] Everything above is the same as in the `mqtt` submodule; below are a few classes to make the config easier to manage/read. ########## mqttv5/src/test/scala/docs/scaladsl/MqttSinkSpec.scala: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com> + */ + +package docs.scaladsl + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.apache.pekko.Done +import org.apache.pekko.stream.connectors.mqttv5 +import org.apache.pekko.stream.connectors.mqttv5.MqttMessage +import org.apache.pekko.stream.connectors.mqttv5.MqttQoS +import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions +import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink +import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.util.ByteString +import org.eclipse.paho.mqttv5.common.MqttException + +class MqttSinkSpec extends MqttSpecBase("MqttSinkSpec") { + "mqtt sink" should { + "send one message to a topic" in { + val topic = "v5/sink-spec/topic1" + + val msg = MqttMessage(topic, ByteString("ohi")) + + val (subscribed, message) = MqttSource + .atMostOnce(connectionSettings.withClientId("sink-spec/source1"), + MqttSubscriptions(topic, MqttQoS.AtLeastOnce), + 8) + .toMat(Sink.head)(Keep.both) + .run() + + Await.ready(subscribed, timeout) + Source.single(msg).runWith(MqttSink(connectionSettings.withClientId("sink-spec/sink1"), + MqttQoS.atLeastOnce)) + + message.futureValue shouldBe msg + } + + "send multiple messages to a topic" in { + val topic = "v5/sink-spec/topic2" + + val msg = MqttMessage(topic, ByteString("ohi")) + val numOfMessages = 5 + + val (subscribed, messagesFuture) = + MqttSource + .atMostOnce(connectionSettings.withClientId("sink-spec/source2"), + MqttSubscriptions(topic, MqttQoS.atLeastOnce), + 8) + .take(numOfMessages) + .toMat(Sink.seq)(Keep.both) + .run() + + Await.ready(subscribed, timeout) + Source(1 to numOfMessages).map(_ => msg).runWith( + MqttSink(connectionSettings.withClientId("sink-spec/sink2"), MqttQoS.atLeastOnce)) + + val messages = messagesFuture.futureValue + (messages should have).length(numOfMessages) + messages.foreach { _ shouldBe msg } + } + + "connection should fail to wrong broker" in { + val secureTopic = "v5/sink-spec/secure-topic1" + + val wrongConnectionSettings = + connectionSettings.withClientId("sink-spec/sink3").withBroker("tcp://localhost:1884") + val msg = MqttMessage(secureTopic, ByteString("ohi")) + + val termination = Source + .single(msg) + .runWith(MqttSink(wrongConnectionSettings, MqttQoS.atLeastOnce)) + + termination.failed.futureValue shouldBe an[MqttException] + } + + "fail to publish when credentials are not provided" in { + val secureTopic = "v5/sink-spec/secure-topic2" + val msg = MqttMessage(secureTopic, ByteString("ohi")) + + val termination = + Source.single(msg).runWith(MqttSink(connectionSettings.withClientId("sink-spec/sink4").withAuth( + "username1", "bad_password"), MqttQoS.atLeastOnce)) + + whenReady(termination.failed) { ex => + ex shouldBe an[MqttException] + ex.getMessage should include("Not authorized") + } + } + + "publish when credentials are provided" in { + val secureTopic = "v5/sink-spec/secure-topic3" + val msg = MqttMessage(secureTopic, ByteString("ohi")) + + val (subscribed, message) = MqttSource + .atMostOnce(connectionSettings.withClientId("sink-spec/source1").withAuth("username1", + "password1"), + MqttSubscriptions(secureTopic, MqttQoS.AtLeastOnce), + 8) + .toMat(Sink.head)(Keep.both) + .run() + + Await.ready(subscribed, timeout) + + val termination = Source + .single(msg) + .runWith(MqttSink(connectionSettings.withClientId("sink-spec/sink5").withAuth("username1", + "password1"), MqttQoS.atLeastOnce)) + + termination.futureValue shouldBe Done + + message.futureValue shouldBe msg + } + + "received retained message on new client" in { + val topic = "v5/sink-spec/topic3" + val msg = MqttMessage(topic, ByteString("ohi")).withQos(MqttQoS.atLeastOnce).withRetained(true) + + val messageSent = Source.single(msg).runWith( + MqttSink(connectionSettings.withClientId("sink-spec/sink6"), MqttQoS.atLeastOnce)) + + Await.ready(messageSent, 3.seconds) + + val messageFuture = + MqttSource + .atMostOnce(connectionSettings.withClientId("source-spec/retained"), + mqttv5.MqttSubscriptions(topic, MqttQoS.atLeastOnce), + 8) + .runWith(Sink.head) + + val message = messageFuture.futureValue + message.topic shouldBe msg.topic + message.payload shouldBe msg.payload + } + + "fail to publish to an unauthorized topic" in { Review Comment: [info] The only new test; it triggers the error branch during publishing ########## mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala: ########## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com> + */ + +package org.apache.pekko.stream.connectors.mqttv5.impl + +import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.pekko.Done +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.stream.Shape +import org.apache.pekko.stream._ +import org.apache.pekko.stream.connectors.mqttv5.AuthSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttMessage +import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttQoS +import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck +import org.apache.pekko.stream.stage._ +import org.apache.pekko.util.ByteString +import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions +import org.eclipse.paho.mqttv5.client.IMqttAsyncClient +import org.eclipse.paho.mqttv5.client.IMqttToken +import org.eclipse.paho.mqttv5.client.MqttActionListener +import org.eclipse.paho.mqttv5.client.MqttAsyncClient +import org.eclipse.paho.mqttv5.client.MqttCallback +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse +import org.eclipse.paho.mqttv5.common.MqttException +import org.eclipse.paho.mqttv5.common.packet.MqttProperties +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode +import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage } + +/** + * INTERNAL API + */ +@InternalApi +private[mqttv5] final class MqttFlowStage( + connectionSettings: MqttConnectionSettings, + subscriptions: Map[String, MqttQoS], + bufferSize: Int, + defaultQoS: MqttQoS, + manualAcks: Boolean = false +) extends GraphStageWithMaterializedValue[FlowShape[MqttMessage, MqttMessageWithAck], Future[Done]] { + + private val in = Inlet[MqttMessage]("MqttFlow.in") + private val out = Outlet[MqttMessageWithAck]("MqttFlow.out") + override val shape: Shape = FlowShape(in, out) + + override protected def initialAttributes: Attributes = Attributes.name("MqttFlow") + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val subscriptionPromise = Promise[Done]() + val logic = new MqttFlowStageLogic[MqttMessage]( + in = in, + out = out, + shape = shape, + subscriptionPromise = subscriptionPromise, + connectionSettings = connectionSettings, + subscriptions = subscriptions, + bufferSize = bufferSize, + defaultQoS = defaultQoS, + manualAcks = manualAcks + ) { + override def publishPending(msg: MqttMessage): Unit = super.publishToMqtt(msg) + } + (logic, subscriptionPromise.future) + } +} + +abstract class MqttFlowStageLogic[I]( + in: Inlet[I], + out: Outlet[MqttMessageWithAck], + shape: Shape, + subscriptionPromise: Promise[Done], + connectionSettings: MqttConnectionSettings, + subscriptions: Map[String, MqttQoS], + bufferSize: Int, + defaultQoS: MqttQoS, + manualAcks: Boolean +) extends GraphStageLogic(shape) + with StageLogging + with InHandler + with OutHandler { + + import MqttFlowStageLogic._ + + private val backpressurePahoClient = new Semaphore(bufferSize) + private var pendingMsg = Option.empty[I] + private val queue = mutable.Queue[MqttMessageWithAck]() + private val unackedMessages = new AtomicInteger() + + protected def handleDeliveryComplete(token: IMqttToken): Unit = () + + private val onSubscribe: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] { conn => + if (subscriptionPromise.isCompleted) { + log.debug( + "Client [{}] re-established subscription to broker [{}]", + connectionSettings.clientId, + connectionSettings.broker + ) + } else { + subscriptionPromise.complete(conn.map(_ => { + log.debug( + "Client [{}] established subscription to broker [{}]", + connectionSettings.clientId, + connectionSettings.broker + ) + Done + })) + pull(in) + } + } + + private val onConnect: AsyncCallback[IMqttAsyncClient] = + getAsyncCallback[IMqttAsyncClient]((client: IMqttAsyncClient) => { + if (subscriptions.nonEmpty) { + if (manualAcks) client.setManualAcks(true) + val (topics, qoses) = subscriptions.unzip + log.debug( + "Client [{}] connected to broker [{}]; subscribing to [{}]", + connectionSettings.clientId, + connectionSettings.broker, + subscriptions.map(sub => s"${sub._1}(qos=${sub._2.value})").mkString(", ") + ) + client.subscribe( + topics.toArray, + qoses.map(_.value).toArray, + /* userContext */ null, + /* callback */ new MqttActionListener { + def onSuccess(token: IMqttToken): Unit = { + token.getReasonCodes.toList.filter(_ >= MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR).distinct match { Review Comment: [info] Since v5 now provides more return codes, here we check and fail the stage if something went wrong. ########## mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/settings.scala: ########## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com> + */ + +package org.apache.pekko.stream.connectors.mqttv5 + +import java.nio.charset.StandardCharsets +import java.util.Properties + +import scala.collection.immutable +import scala.concurrent.duration._ + +import org.apache.pekko +import org.apache.pekko.japi.Pair +import org.apache.pekko.util.JavaDurationConverters._ +import org.apache.pekko.util.ccompat.JavaConverters._ +import org.eclipse.paho.mqttv5.client.MqttClientPersistence +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions +import org.eclipse.paho.mqttv5.common.packet.MqttProperties +import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage } + +/** + * Quality of Service constants as defined in + * [[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901234]] + */ +sealed abstract class MqttQoS { + def value: Int +} + +/** + * Quality of Service constants as defined in + * [[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901234]] + */ +object MqttQoS { + + /** + * Quality of Service 0 - indicates that a message should be delivered at most once (zero or one times). The message + * will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should + * only be used for messages which are not valuable. + */ + object AtMostOnce extends MqttQoS { + val value: Int = 0 + } + + /** + * Quality of Service 1 - indicates that a message should be delivered at least once (one or more times). The message + * can only be delivered safely if it can be persisted, so the application must supply a means of persistence using + * [[MqttConnectionSettings]]. If a persistence mechanism is not specified, the message will not be delivered in the + * event of a client failure. The message will be acknowledged across the network. + */ + object AtLeastOnce extends MqttQoS { + val value: Int = 1 + } + + /** + * Quality of Service 2 - indicates that a message should be delivered once. The message will be persisted to disk, + * and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely + * if it can be persisted, so the application must supply a means of persistence using [[MqttConnectionSettings]]. + * If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. + */ + object ExactlyOnce extends MqttQoS { + val value: Int = 2 + } + + /** + * Java API + * + * Quality of Service 0 - indicates that a message should be delivered at most once (zero or one times). The message + * will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should + * only be used for messages which are not valuable. + */ + def atMostOnce: MqttQoS = AtMostOnce + + /** + * Java API + * + * Quality of Service 1 - indicates that a message should be delivered at least once (one or more times). The message + * can only be delivered safely if it can be persisted, so the application must supply a means of persistence using + * [[MqttConnectionSettings]]. If a persistence mechanism is not specified, the message will not be delivered in the + * event of a client failure. The message will be acknowledged across the network. + */ + def atLeastOnce: MqttQoS = AtLeastOnce + + /** + * Java API + * + * Quality of Service 2 - indicates that a message should be delivered once. The message will be persisted to disk, + * and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely + * if it can be persisted, so the application must supply a means of persistence using [[MqttConnectionSettings]]. + * If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. + */ + def exactlyOnce: MqttQoS = ExactlyOnce +} + +/** + * The mapping of topics to subscribe to and the requested Quality of Service ([[MqttQoS]]) per topic. + */ +final class MqttSubscriptions private ( + val subscriptions: Map[String, MqttQoS]) { + + /** Scala API */ + def withSubscriptions(subscriptions: Map[String, MqttQoS]): MqttSubscriptions = + new MqttSubscriptions(subscriptions) + + /** Java API */ + def withSubscriptions(subscriptions: java.util.List[pekko.japi.Pair[String, MqttQoS]]): MqttSubscriptions = + new MqttSubscriptions(subscriptions.asScala.map(_.toScala).toMap) + + /** Add this subscription to the map of subscriptions configured already. */ + def addSubscription(topic: String, qos: MqttQoS): MqttSubscriptions = + new MqttSubscriptions(this.subscriptions.updated(topic, qos)) +} + +/** + * The mapping of topics to subscribe to and the requested Quality of Service ([[MqttQoS]]) per topic. + */ +object MqttSubscriptions { + val empty = new MqttSubscriptions(Map.empty) + + /** Scala API */ + def apply(subscriptions: Map[String, MqttQoS]): MqttSubscriptions = + new MqttSubscriptions(subscriptions) + + /** Scala API */ + def apply(topic: String, qos: MqttQoS): MqttSubscriptions = + new MqttSubscriptions(Map(topic -> qos)) + + /** Scala API */ + def apply(subscription: (String, MqttQoS)): MqttSubscriptions = + new MqttSubscriptions(Map(subscription)) + + /** Java API */ + def create(subscriptions: java.util.List[pekko.japi.Pair[String, MqttQoS]]): MqttSubscriptions = + new MqttSubscriptions(subscriptions.asScala.map(_.toScala).toMap) + + /** Java API */ + def create(topic: String, qos: MqttQoS): MqttSubscriptions = + new MqttSubscriptions(Map(topic -> qos)) + +} + +private[mqttv5] final case class CleanStartSettings( + enabled: Boolean, + sessionExpiration: Option[FiniteDuration] +) + +private[mqttv5] final case class DisconnectSettings( + quiesceTimeout: FiniteDuration, + timeout: FiniteDuration, + sendDisconnectPacket: Boolean +) + +private[mqttv5] sealed trait AuthSettings { + def asString: String +} + +private[mqttv5] object AuthSettings { + case object Disabled extends AuthSettings { + override lazy val asString: String = "Disabled" + } + + final case class Simple( + username: String, + password: String + ) extends AuthSettings { + override lazy val asString: String = s"Simple(username=$username)" + } + + final case class Enhanced( + method: String, + initialData: Array[Byte], + authPacketHandler: (Int, MqttProperties) => (Int, MqttProperties) + ) extends AuthSettings { + override lazy val asString: String = s"Enhanced(method=$method)" + } +} + +private[mqttv5] final case class MqttOfflinePersistenceSettings( + bufferSize: Int = 5000, + deleteOldestMessage: Boolean = false, + persistBuffer: Boolean = true +) + +/** + * Connection settings passed to the underlying Paho client. + * + * Java docs for `MqttConnectionOptions` are not available; + * see [[https://github.com/eclipse-paho/paho.mqtt.java/issues/1012]] or + * [[org.eclipse.paho.mqttv5.client.MqttConnectionOptions]] for more info + */ +final class MqttConnectionSettings private ( + val broker: String, + val clientId: String, + val persistence: MqttClientPersistence, + val cleanStart: CleanStartSettings, + val disconnect: DisconnectSettings, + val auth: AuthSettings, + val offlinePersistence: Option[MqttOfflinePersistenceSettings], + val automaticReconnect: Boolean, + val keepAliveInterval: FiniteDuration, + val connectionTimeout: FiniteDuration, + val serverUris: Array[String], + val will: Option[MqttMessage], + val sslProperties: Map[String, String], + val socketFactory: Option[javax.net.ssl.SSLSocketFactory], + val sslHostnameVerifier: Option[javax.net.ssl.HostnameVerifier] +) { + def asMqttConnectionOptions(): MqttConnectionOptions = { Review Comment: [info] The above settings are converted to the `MqttConnectionOptions` expected by the client; this was previously done in `MqttFlowStage` but I think it's easier to understand how it's all configured if it's here. As a side note, I wanted to make this a `lazy val` so that library users could potentially update the options on their own (since the resulting object is mutable) but I'm not sure how this will all work when used from Java code. ########## mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala: ########## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com> + */ + +package org.apache.pekko.stream.connectors.mqttv5.impl + +import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.pekko.Done +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.stream.Shape +import org.apache.pekko.stream._ +import org.apache.pekko.stream.connectors.mqttv5.AuthSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttMessage +import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttQoS +import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck +import org.apache.pekko.stream.stage._ +import org.apache.pekko.util.ByteString +import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions +import org.eclipse.paho.mqttv5.client.IMqttAsyncClient +import org.eclipse.paho.mqttv5.client.IMqttToken +import org.eclipse.paho.mqttv5.client.MqttActionListener +import org.eclipse.paho.mqttv5.client.MqttAsyncClient +import org.eclipse.paho.mqttv5.client.MqttCallback +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse +import org.eclipse.paho.mqttv5.common.MqttException +import org.eclipse.paho.mqttv5.common.packet.MqttProperties +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode +import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage } + +/** + * INTERNAL API + */ +@InternalApi +private[mqttv5] final class MqttFlowStage( + connectionSettings: MqttConnectionSettings, + subscriptions: Map[String, MqttQoS], + bufferSize: Int, + defaultQoS: MqttQoS, + manualAcks: Boolean = false +) extends GraphStageWithMaterializedValue[FlowShape[MqttMessage, MqttMessageWithAck], Future[Done]] { + + private val in = Inlet[MqttMessage]("MqttFlow.in") + private val out = Outlet[MqttMessageWithAck]("MqttFlow.out") + override val shape: Shape = FlowShape(in, out) + + override protected def initialAttributes: Attributes = Attributes.name("MqttFlow") + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val subscriptionPromise = Promise[Done]() + val logic = new MqttFlowStageLogic[MqttMessage]( + in = in, + out = out, + shape = shape, + subscriptionPromise = subscriptionPromise, + connectionSettings = connectionSettings, + subscriptions = subscriptions, + bufferSize = bufferSize, + defaultQoS = defaultQoS, + manualAcks = manualAcks + ) { + override def publishPending(msg: MqttMessage): Unit = super.publishToMqtt(msg) + } + (logic, subscriptionPromise.future) + } +} + +abstract class MqttFlowStageLogic[I]( + in: Inlet[I], + out: Outlet[MqttMessageWithAck], + shape: Shape, + subscriptionPromise: Promise[Done], + connectionSettings: MqttConnectionSettings, + subscriptions: Map[String, MqttQoS], + bufferSize: Int, + defaultQoS: MqttQoS, + manualAcks: Boolean +) extends GraphStageLogic(shape) + with StageLogging + with InHandler + with OutHandler { + + import MqttFlowStageLogic._ + + private val backpressurePahoClient = new Semaphore(bufferSize) + private var pendingMsg = Option.empty[I] + private val queue = mutable.Queue[MqttMessageWithAck]() + private val unackedMessages = new AtomicInteger() + + protected def handleDeliveryComplete(token: IMqttToken): Unit = () + + private val onSubscribe: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] { conn => + if (subscriptionPromise.isCompleted) { + log.debug( + "Client [{}] re-established subscription to broker [{}]", + connectionSettings.clientId, + connectionSettings.broker + ) + } else { + subscriptionPromise.complete(conn.map(_ => { + log.debug( + "Client [{}] established subscription to broker [{}]", + connectionSettings.clientId, + connectionSettings.broker + ) + Done + })) + pull(in) + } + } + + private val onConnect: AsyncCallback[IMqttAsyncClient] = + getAsyncCallback[IMqttAsyncClient]((client: IMqttAsyncClient) => { + if (subscriptions.nonEmpty) { + if (manualAcks) client.setManualAcks(true) + val (topics, qoses) = subscriptions.unzip + log.debug( + "Client [{}] connected to broker [{}]; subscribing to [{}]", + connectionSettings.clientId, + connectionSettings.broker, + subscriptions.map(sub => s"${sub._1}(qos=${sub._2.value})").mkString(", ") + ) + client.subscribe( + topics.toArray, + qoses.map(_.value).toArray, + /* userContext */ null, + /* callback */ new MqttActionListener { + def onSuccess(token: IMqttToken): Unit = { + token.getReasonCodes.toList.filter(_ >= MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR).distinct match { + case Nil => + onSubscribe.invoke(Success(token)) + + case errors => + val message = s"Client [${connectionSettings.clientId}] received one or more errors " + + s"while subscribing to broker [${connectionSettings.broker}]: " + + s"[${errors.map(e => s"code=${e.toString}").mkString(",")}]" + log.error(message) + onSubscribe.invoke(Failure(new RuntimeException(message))) + } + } + + def onFailure(token: IMqttToken, ex: Throwable): Unit = + onSubscribe.invoke(Failure(ex)) + } + ) + } else { + log.debug( + "Client [{}] connected to broker [{}] without subscriptions", + connectionSettings.clientId, + connectionSettings.broker + ) + subscriptionPromise.complete(SuccessfullyDone) + pull(in) + } + }) + + private val onConnectionLost: AsyncCallback[Throwable] = getAsyncCallback[Throwable](failStageWith) + + private val onMessageAsyncCallback: AsyncCallback[MqttMessageWithAck] = + getAsyncCallback[MqttMessageWithAck] { message => + if (isAvailable(out)) { + pushDownstream(message) + } else if (queue.size + 1 > bufferSize) { + failStageWith(new RuntimeException(s"Reached maximum buffer size [$bufferSize]")) + } else { + queue.enqueue(message) + } + } + + private val onPublished: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] { + case Success(_) => if (!hasBeenPulled(in)) pull(in) + case Failure(ex) => failStageWith(ex) + } + + private def createPahoBufferOptions(settings: MqttOfflinePersistenceSettings): DisconnectedBufferOptions = { + val disconnectedBufferOptions = new DisconnectedBufferOptions() + + disconnectedBufferOptions.setBufferEnabled(true) + disconnectedBufferOptions.setBufferSize(settings.bufferSize) + disconnectedBufferOptions.setDeleteOldestMessages(settings.deleteOldestMessage) + disconnectedBufferOptions.setPersistBuffer(settings.persistBuffer) + + disconnectedBufferOptions + } + + private val client = new MqttAsyncClient( + connectionSettings.broker, + connectionSettings.clientId, + connectionSettings.persistence + ) + + private def mqttClient: MqttAsyncClient = connectionSettings.offlinePersistence match { + case Some(bufferOpts) => + client.setBufferOpts(createPahoBufferOptions(bufferOpts)) + client + + case _ => + client + } + + private val commitCallback: AsyncCallback[CommitCallbackArguments] = + getAsyncCallback[CommitCallbackArguments]((args: CommitCallbackArguments) => + try { + mqttClient.messageArrivedComplete(args.messageId, args.qos.value) + if (unackedMessages.decrementAndGet() == 0 && (isClosed(out) || (isClosed(in) && queue.isEmpty))) + completeStage() + args.promise.complete(SuccessfullyDone) + } catch { + case ex: Throwable => args.promise.failure(ex) + } + ) + + mqttClient.setCallback( + new MqttCallback { + override def messageArrived(topic: String, pahoMessage: PahoMqttMessage): Unit = { + backpressurePahoClient.acquire() + val message = new MqttMessageWithAck { + override val message: MqttMessage = MqttMessage(topic, ByteString.fromArrayUnsafe(pahoMessage.getPayload)) + + override def ack(): Future[Done] = { + val promise = Promise[Done]() + val qos = pahoMessage.getQos match { + case 0 => MqttQoS.AtMostOnce + case 1 => MqttQoS.AtLeastOnce + case 2 => MqttQoS.ExactlyOnce + } + commitCallback.invoke(CommitCallbackArguments(pahoMessage.getId, qos, promise)) + promise.future + } + } + onMessageAsyncCallback.invoke(message) + } + + override def deliveryComplete(token: IMqttToken): Unit = + handleDeliveryComplete(token) + + override def disconnected(disconnectResponse: MqttDisconnectResponse): Unit = { + if (!connectionSettings.automaticReconnect) { + log.error( + "Client [{}] lost connection to broker [{}] with [code={},reason={}]; " + + "(hint: `automaticReconnect` can be enabled in `MqttConnectionSettings`)", + connectionSettings.clientId, + connectionSettings.broker, + disconnectResponse.getReturnCode, + disconnectResponse.getReasonString + ) + onConnectionLost.invoke(disconnectResponse.getException) + } else { + log.warning( + "Client [{}] lost connection to broker [{}] with [code={},reason={}]; trying to reconnect...", + connectionSettings.clientId, + connectionSettings.broker, + disconnectResponse.getReturnCode, + disconnectResponse.getReasonString + ) + } + } + + override def mqttErrorOccurred(exception: MqttException): Unit = + failStageWith(exception) + + override def authPacketArrived(reasonCode: Int, properties: MqttProperties): Unit = { + connectionSettings.auth match { + case AuthSettings.Enhanced(_, _, _) if reasonCode == 0x00 => + // (re)authentication successful; no further action needed + log.debug( + "Authentication for client [{}] completed successfully with [codes={},reason={}]", + connectionSettings.clientId, + reasonCode, + properties.getReasonString + ) + + case AuthSettings.Enhanced(_, _, authPacketHandler) if reasonCode == 0x18 => + // continue authentication + log.debug( + "Authentication for client [{}] continuing with [codes={},reason={}]", + connectionSettings.clientId, + reasonCode, + properties.getReasonString + ) + + val (responseCode, responseProperties) = authPacketHandler(reasonCode, properties) + + val result = mqttClient.authenticate( + /* reasonCode */ responseCode, + /* userContext */ null, + /* properties */ responseProperties + ) + + Option(result).foreach { token => + // the API docs say that a token for the operation is returned but the current + // implementation in `org.eclipse.paho.mqttv5.client.MqttAsyncClient` actually + // returns `null`; in case this changes in the future, here we log the results + // of the operation and hope for the best + token.setActionCallback( + new MqttActionListener { + override def onSuccess(token: IMqttToken): Unit = + log.debug( + "Authentication call for client [{}] completed with [codes={},reason={}]", + connectionSettings.clientId, + token.getReasonCodes.distinct.sorted.mkString(";"), + token.getResponseProperties.getReasonString + ) + + override def onFailure(token: IMqttToken, ex: Throwable): Unit = + log.debug( + "Authentication call for client [{}] failed with [codes={},reason={}]: [{}]", + connectionSettings.clientId, + token.getReasonCodes.distinct.sorted.mkString(";"), + token.getResponseProperties.getReasonString, + s"${ex.getClass.getSimpleName} - ${ex.getMessage}" + ) + } + ) + } + + case other => + // unexpected reason code received (if enhanced authentication is used) + // OR + // unexpected AUTH packet received (if no or simple authentication is used) + log.warning( + "Client [{}] with authentication type [{}] received an unexpected AUTH packet with [code={},reason={}]", + connectionSettings.clientId, + other.getClass.getSimpleName.replaceAll("[^a-zA-Z0-9]", "").toLowerCase, + reasonCode, + properties.getReasonString + ) + } + } + + override def connectComplete(reconnect: Boolean, serverURI: String): Unit = { + log.debug( + "Connection completed for client [{}] with [reconnect={},serverURI={}]", + connectionSettings.clientId, + reconnect, + serverURI + ) + pendingMsg.foreach { msg => + log.debug("Client [{}] sending pending message to broker [{}]", connectionSettings.clientId, serverURI) + publishPending(msg) + pendingMsg = None + } + if (reconnect && !hasBeenPulled(in)) pull(in) + } + } + ) + + override def onPush(): Unit = { + val msg = grab(in) + try { + publishPending(msg) + } catch { + case _: MqttException if connectionSettings.automaticReconnect => pendingMsg = Some(msg) + case NonFatal(e) => throw e + } + } + + override def onUpstreamFinish(): Unit = { + setKeepGoing(true) + if (queue.isEmpty && unackedMessages.get() == 0) super.onUpstreamFinish() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + setKeepGoing(true) + if (queue.isEmpty && unackedMessages.get() == 0) super.onUpstreamFailure(ex) + } + + override def onPull(): Unit = + if (queue.nonEmpty) { + pushDownstream(queue.dequeue()) + if (unackedMessages.get() == 0 && isClosed(in)) completeStage() + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + setKeepGoing(true) + if (unackedMessages.get() == 0) super.onDownstreamFinish(cause) + } + + setHandlers(in, out, this) + + def publishToMqtt(msg: MqttMessage): IMqttToken = { + val pahoMsg = new PahoMqttMessage(msg.payload.toArray) + pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value) + pahoMsg.setRetained(msg.retained) + + mqttClient.publish( + msg.topic, + pahoMsg, + msg, + /* callback */ new MqttActionListener { + def onSuccess(token: IMqttToken): Unit = { + token.getReasonCodes.toList.filter(_ >= MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR).distinct match { Review Comment: [info] As with the subscription above, the return codes are checked and the stage is failed if something went wrong. ########## mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala: ########## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com> + */ + +package org.apache.pekko.stream.connectors.mqttv5.impl + +import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.pekko.Done +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.stream.Shape +import org.apache.pekko.stream._ +import org.apache.pekko.stream.connectors.mqttv5.AuthSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttMessage +import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings +import org.apache.pekko.stream.connectors.mqttv5.MqttQoS +import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck +import org.apache.pekko.stream.stage._ +import org.apache.pekko.util.ByteString +import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions +import org.eclipse.paho.mqttv5.client.IMqttAsyncClient +import org.eclipse.paho.mqttv5.client.IMqttToken +import org.eclipse.paho.mqttv5.client.MqttActionListener +import org.eclipse.paho.mqttv5.client.MqttAsyncClient +import org.eclipse.paho.mqttv5.client.MqttCallback +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse +import org.eclipse.paho.mqttv5.common.MqttException +import org.eclipse.paho.mqttv5.common.packet.MqttProperties +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode +import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage } + +/** + * INTERNAL API + */ +@InternalApi +private[mqttv5] final class MqttFlowStage( + connectionSettings: MqttConnectionSettings, + subscriptions: Map[String, MqttQoS], + bufferSize: Int, + defaultQoS: MqttQoS, + manualAcks: Boolean = false +) extends GraphStageWithMaterializedValue[FlowShape[MqttMessage, MqttMessageWithAck], Future[Done]] { + + private val in = Inlet[MqttMessage]("MqttFlow.in") + private val out = Outlet[MqttMessageWithAck]("MqttFlow.out") + override val shape: Shape = FlowShape(in, out) + + override protected def initialAttributes: Attributes = Attributes.name("MqttFlow") + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val subscriptionPromise = Promise[Done]() + val logic = new MqttFlowStageLogic[MqttMessage]( + in = in, + out = out, + shape = shape, + subscriptionPromise = subscriptionPromise, + connectionSettings = connectionSettings, + subscriptions = subscriptions, + bufferSize = bufferSize, + defaultQoS = defaultQoS, + manualAcks = manualAcks + ) { + override def publishPending(msg: MqttMessage): Unit = super.publishToMqtt(msg) + } + (logic, subscriptionPromise.future) + } +} + +abstract class MqttFlowStageLogic[I]( + in: Inlet[I], + out: Outlet[MqttMessageWithAck], + shape: Shape, + subscriptionPromise: Promise[Done], + connectionSettings: MqttConnectionSettings, + subscriptions: Map[String, MqttQoS], + bufferSize: Int, + defaultQoS: MqttQoS, + manualAcks: Boolean +) extends GraphStageLogic(shape) + with StageLogging + with InHandler + with OutHandler { + + import MqttFlowStageLogic._ + + private val backpressurePahoClient = new Semaphore(bufferSize) + private var pendingMsg = Option.empty[I] + private val queue = mutable.Queue[MqttMessageWithAck]() + private val unackedMessages = new AtomicInteger() + + protected def handleDeliveryComplete(token: IMqttToken): Unit = () + + private val onSubscribe: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] { conn => + if (subscriptionPromise.isCompleted) { + log.debug( + "Client [{}] re-established subscription to broker [{}]", + connectionSettings.clientId, + connectionSettings.broker + ) + } else { + subscriptionPromise.complete(conn.map(_ => { + log.debug( + "Client [{}] established subscription to broker [{}]", + connectionSettings.clientId, + connectionSettings.broker + ) + Done + })) + pull(in) + } + } + + private val onConnect: AsyncCallback[IMqttAsyncClient] = + getAsyncCallback[IMqttAsyncClient]((client: IMqttAsyncClient) => { + if (subscriptions.nonEmpty) { + if (manualAcks) client.setManualAcks(true) + val (topics, qoses) = subscriptions.unzip + log.debug( + "Client [{}] connected to broker [{}]; subscribing to [{}]", + connectionSettings.clientId, + connectionSettings.broker, + subscriptions.map(sub => s"${sub._1}(qos=${sub._2.value})").mkString(", ") + ) + client.subscribe( + topics.toArray, + qoses.map(_.value).toArray, + /* userContext */ null, + /* callback */ new MqttActionListener { + def onSuccess(token: IMqttToken): Unit = { + token.getReasonCodes.toList.filter(_ >= MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR).distinct match { + case Nil => + onSubscribe.invoke(Success(token)) + + case errors => + val message = s"Client [${connectionSettings.clientId}] received one or more errors " + + s"while subscribing to broker [${connectionSettings.broker}]: " + + s"[${errors.map(e => s"code=${e.toString}").mkString(",")}]" + log.error(message) + onSubscribe.invoke(Failure(new RuntimeException(message))) + } + } + + def onFailure(token: IMqttToken, ex: Throwable): Unit = + onSubscribe.invoke(Failure(ex)) + } + ) + } else { + log.debug( + "Client [{}] connected to broker [{}] without subscriptions", + connectionSettings.clientId, + connectionSettings.broker + ) + subscriptionPromise.complete(SuccessfullyDone) + pull(in) + } + }) + + private val onConnectionLost: AsyncCallback[Throwable] = getAsyncCallback[Throwable](failStageWith) + + private val onMessageAsyncCallback: AsyncCallback[MqttMessageWithAck] = + getAsyncCallback[MqttMessageWithAck] { message => + if (isAvailable(out)) { + pushDownstream(message) + } else if (queue.size + 1 > bufferSize) { + failStageWith(new RuntimeException(s"Reached maximum buffer size [$bufferSize]")) + } else { + queue.enqueue(message) + } + } + + private val onPublished: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] { + case Success(_) => if (!hasBeenPulled(in)) pull(in) + case Failure(ex) => failStageWith(ex) + } + + private def createPahoBufferOptions(settings: MqttOfflinePersistenceSettings): DisconnectedBufferOptions = { + val disconnectedBufferOptions = new DisconnectedBufferOptions() + + disconnectedBufferOptions.setBufferEnabled(true) + disconnectedBufferOptions.setBufferSize(settings.bufferSize) + disconnectedBufferOptions.setDeleteOldestMessages(settings.deleteOldestMessage) + disconnectedBufferOptions.setPersistBuffer(settings.persistBuffer) + + disconnectedBufferOptions + } + + private val client = new MqttAsyncClient( + connectionSettings.broker, + connectionSettings.clientId, + connectionSettings.persistence + ) + + private def mqttClient: MqttAsyncClient = connectionSettings.offlinePersistence match { + case Some(bufferOpts) => + client.setBufferOpts(createPahoBufferOptions(bufferOpts)) + client + + case _ => + client + } + + private val commitCallback: AsyncCallback[CommitCallbackArguments] = + getAsyncCallback[CommitCallbackArguments]((args: CommitCallbackArguments) => + try { + mqttClient.messageArrivedComplete(args.messageId, args.qos.value) + if (unackedMessages.decrementAndGet() == 0 && (isClosed(out) || (isClosed(in) && queue.isEmpty))) + completeStage() + args.promise.complete(SuccessfullyDone) + } catch { + case ex: Throwable => args.promise.failure(ex) + } + ) + + mqttClient.setCallback( + new MqttCallback { + override def messageArrived(topic: String, pahoMessage: PahoMqttMessage): Unit = { + backpressurePahoClient.acquire() + val message = new MqttMessageWithAck { + override val message: MqttMessage = MqttMessage(topic, ByteString.fromArrayUnsafe(pahoMessage.getPayload)) + + override def ack(): Future[Done] = { + val promise = Promise[Done]() + val qos = pahoMessage.getQos match { + case 0 => MqttQoS.AtMostOnce + case 1 => MqttQoS.AtLeastOnce + case 2 => MqttQoS.ExactlyOnce + } + commitCallback.invoke(CommitCallbackArguments(pahoMessage.getId, qos, promise)) + promise.future + } + } + onMessageAsyncCallback.invoke(message) + } + + override def deliveryComplete(token: IMqttToken): Unit = + handleDeliveryComplete(token) + + override def disconnected(disconnectResponse: MqttDisconnectResponse): Unit = { + if (!connectionSettings.automaticReconnect) { + log.error( + "Client [{}] lost connection to broker [{}] with [code={},reason={}]; " + + "(hint: `automaticReconnect` can be enabled in `MqttConnectionSettings`)", + connectionSettings.clientId, + connectionSettings.broker, + disconnectResponse.getReturnCode, + disconnectResponse.getReasonString + ) + onConnectionLost.invoke(disconnectResponse.getException) + } else { + log.warning( + "Client [{}] lost connection to broker [{}] with [code={},reason={}]; trying to reconnect...", + connectionSettings.clientId, + connectionSettings.broker, + disconnectResponse.getReturnCode, + disconnectResponse.getReasonString + ) + } + } + + override def mqttErrorOccurred(exception: MqttException): Unit = + failStageWith(exception) + + override def authPacketArrived(reasonCode: Int, properties: MqttProperties): Unit = { Review Comment: [info] This is the biggest chunk of new code - handling the `AUTH` packets; it's based on the reason codes provided [here](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901220) but since I couldn't find a reasonable way to get an enhanced auth plugin working for mosquitto, it's all untested. The idea is that for enhanced authentication, `AUTH` packets with reason code `0x18` will keep arriving until the process is complete; for each one, the `authPacketHandler` will be called and it's up the library user to provide something that makes sense for their protocol. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org