This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 579e2de947 Port akka-core PR #31906 and #31926: DnsResolutionActor and 
in-flight deduplication (#2919)
579e2de947 is described below

commit 579e2de947385da4ed6bca4e6b98e031385ce765
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 10:30:33 2026 +0200

    Port akka-core PR #31906 and #31926: DnsResolutionActor and in-flight 
deduplication (#2919)
    
    * feat: port akka-core PR #31906 and #31926 - DnsResolutionActor and 
in-flight dedup
    
    - Replace future-based resolution in AsyncDnsResolver with per-request
      DnsResolutionActor actors (ported from akka-core PR #31906)
    - Add in-flight deduplication: concurrent resolves for same (name, mode)
      share a single DNS lookup
    - DnsResolutionActor handles search domains, nameserver failover, and
      DuplicateId retries using pekko's existing IdGenerator
    - ResolutionAnswer message routes results through AsyncDnsResolver
      so it can update cache and notify all waiting senders
    - Port three new tests from akka-core PR #31926:
      - 'attempt to drop a failed question on timeout'
      - 'not reuse the request ids of pending requests'
      - 'reuse in-progress resolutions'
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/dc5393cc-d827-4840-85f5-07f67c5142ac
    
    Co-authored-by: pjfanning <[email protected]>
    
    * refactor: address code review - fix double map lookup, improve comment
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/dc5393cc-d827-4840-85f5-07f67c5142ac
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../io/dns/internal/AsyncDnsResolverSpec.scala     |  55 +++-
 .../pekko/io/dns/internal/AsyncDnsResolver.scala   | 347 ++++++++++++---------
 2 files changed, 261 insertions(+), 141 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
index 51b51d88eb..a3dfb031ac 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
@@ -26,7 +26,7 @@ import pekko.io.dns.{ AAAARecord, ARecord, DnsSettings, 
IdGenerator, SRVRecord }
 import pekko.io.dns.CachePolicy.Ttl
 import pekko.io.dns.DnsProtocol._
 import pekko.io.dns.internal.AsyncDnsResolver.ResolveFailedException
-import pekko.io.dns.internal.DnsClient.{ Answer, DuplicateId, Question4, 
Question6, SrvQuestion }
+import pekko.io.dns.internal.DnsClient.{ Answer, DropRequest, DuplicateId, 
Question4, Question6, SrvQuestion }
 import pekko.testkit.{ PekkoSpec, TestProbe, WithLogCapturing }
 
 import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory }
@@ -300,6 +300,59 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
 
       senderProbe.expectMsg(Resolved("cats.com", im.Seq(ipv4Record)))
     }
+
+    "attempt to drop a failed question on timeout" in new Setup {
+      val configWithExtraShortTimeout =
+        defaultConfig.withValue("resolve-timeout", 
ConfigValueFactory.fromAnyRef("1 ms"))
+      override val r = resolver(List(dnsClient1.ref), 
configWithExtraShortTimeout)
+
+      r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
+      val q4 = dnsClient1.expectMsgPF() {
+        case q: Question4 if q.name == "cats.com" => q
+      }
+      // ask times out because no reply; the DnsResolutionActor should drop 
the pending question
+      dnsClient1.expectMsgPF(remainingOrDefault) {
+        case DropRequest(dropped) if dropped == q4 =>
+      }
+    }
+
+    "not reuse the request ids of pending requests" in new Setup {
+      // Send multiple resolves for different names so no in-flight 
deduplication applies
+      val resolveCount = 10
+      (1 to resolveCount).foreach { i =>
+        r.tell(Resolve(s"host$i.cats.com", Ip(ipv4 = true, ipv6 = false)), 
senderProbe.ref)
+      }
+
+      // Each resolve should have received a Question4 from dnsClient1 with a 
unique ID
+      val receivedIds = (1 to resolveCount).map { _ =>
+        dnsClient1.expectMsgPF(remainingOrDefault) {
+          case Question4(id, _) => id
+        }
+      }
+      receivedIds.toSet.size shouldBe resolveCount
+    }
+
+    "reuse in-progress resolutions" in new Setup {
+      val asker1 = TestProbe()
+      val asker2 = TestProbe()
+
+      override val r = resolver(List(dnsClient1.ref), defaultConfig)
+
+      val resolve = Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
+
+      r.tell(resolve, asker1.ref)
+      val firstId = dnsClient1.expectMsgPF() {
+        case q4: Question4 if q4.name == "cats.com" => q4.id
+      }
+      // Send a second identical resolve while the first is still pending
+      r.tell(resolve, asker2.ref)
+      // No second DNS question should be sent; the second resolve reuses the 
in-progress one
+      dnsClient1.expectNoMessage(50.millis)
+      dnsClient1.reply(Answer(firstId, Nil))
+
+      asker1.expectMsg(Resolved("cats.com", im.Seq.empty))
+      asker2.expectMsg(Resolved("cats.com", im.Seq.empty))
+    }
   }
 
   def resolver(clients: List[ActorRef], config: Config): ActorRef = {
diff --git 
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala 
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
index aced67214c..7804a74784 100644
--- 
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
@@ -16,15 +16,12 @@ package org.apache.pekko.io.dns.internal
 import java.net.{ Inet4Address, Inet6Address, InetAddress, InetSocketAddress }
 
 import scala.collection.immutable
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
-import scala.util.control.NonFatal
+import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.concurrent.ExecutionContext.parasitic
+import scala.util.{ Failure, Success, Try }
 
 import org.apache.pekko
-import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory }
+import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory, Props, 
Status }
 import pekko.annotation.InternalApi
 import pekko.io.SimpleDnsCache
 import pekko.io.dns._
@@ -56,8 +53,6 @@ private[io] final class AsyncDnsResolver(
 
   import AsyncDnsResolver._
 
-  implicit val ec: ExecutionContextExecutor = context.dispatcher
-
   // avoid ever looking up localhost by pre-populating cache
   {
     val loopback = InetAddress.getLoopbackAddress
@@ -81,9 +76,6 @@ private[io] final class AsyncDnsResolver(
 
   }
 
-  // For ask to DNS Client
-  implicit val timeout: Timeout = Timeout(settings.ResolveTimeout)
-
   val nameServers = settings.NameServers
 
   val positiveCachePolicy = settings.PositiveCachePolicy
@@ -96,6 +88,9 @@ private[io] final class AsyncDnsResolver(
 
   private val resolvers: List[ActorRef] = clientFactory(context, nameServers)
 
+  // tracks in-flight resolutions by (name, requestType) -> list of senders 
waiting for the result
+  private var inFlight: Map[(String, RequestType), List[ActorRef]] = Map.empty
+
   // only supports DnsProtocol, not the deprecated Dns protocol
   // AsyncDnsManager converts between the protocols to support the deprecated 
protocol
   override def receive: Receive = {
@@ -105,139 +100,52 @@ private[io] final class AsyncDnsResolver(
           log.debug("{} cached {}", mode, resolved)
           sender() ! resolved
         case None =>
-          resolveWithResolvers(name, mode, resolvers)
-            .map { resolved =>
-              if (resolved.records.nonEmpty) {
-                val minTtl = (positiveCachePolicy +: 
resolved.records.map(_.ttl)).min
-                cache.put((name, mode), resolved, minTtl)
-              } else if (negativeCachePolicy != Never) cache.put((name, mode), 
resolved, negativeCachePolicy)
-              log.debug(s"{} resolved {}", mode, resolved)
-              resolved
-            }
-            .pipeTo(sender())
-      }
-  }
-
-  private def resolveWithResolvers(
-      name: String,
-      requestType: RequestType,
-      resolvers: List[ActorRef]): Future[DnsProtocol.Resolved] =
-    if (isInetAddress(name)) {
-      Future.fromTry {
-        Try {
-          val address = InetAddress.getByName(name) // only checks validity, 
since known to be IP address
-          val record = address match {
-            case _: Inet4Address           => ARecord(name, 
Ttl.effectivelyForever, address)
-            case ipv6address: Inet6Address => AAAARecord(name, 
Ttl.effectivelyForever, ipv6address)
-            case unexpected                => throw new 
IllegalArgumentException(s"Unexpected address: $unexpected")
-          }
-          DnsProtocol.Resolved(name, record :: Nil)
-        }
-      }
-    } else {
-      resolvers match {
-        case Nil =>
-          Future.failed(ResolveFailedException(s"Failed to resolve $name with 
nameservers: $nameServers"))
-        case head :: tail =>
-          resolveWithSearch(name, requestType, head).recoverWith {
-            case NonFatal(t) =>
-              t match {
-                case _: AskTimeoutException =>
-                  log.info("Resolve of {} timed out after {}. Trying next name 
server", name, timeout.duration.pretty)
-                case _ =>
-                  log.info("Resolve of {} failed. Trying next name server {}", 
name, t.getMessage)
+          // check if we're being asked to resolve an IP address, in which 
case no need to do anything async
+          if (isInetAddress(name)) {
+            Try {
+              InetAddress.getByName(name) match { // only a validity check, 
doesn't do blocking I/O
+                case ipv4: Inet4Address => ARecord(name, 
Ttl.effectivelyForever, ipv4)
+                case ipv6: Inet6Address => AAAARecord(name, 
Ttl.effectivelyForever, ipv6)
+                case unexpected         => throw new 
IllegalArgumentException(s"Unexpected address: $unexpected")
               }
-              resolveWithResolvers(name, requestType, tail)
+            }.fold(
+              ex => { sender() ! Status.Failure(ex) },
+              record => {
+                val resolved = DnsProtocol.Resolved(name, record :: Nil)
+                cache.put(name -> mode, resolved, record.ttl)
+                sender() ! resolved
+              })
+          } else if (inFlight.contains((name, mode))) {
+            // there's already a resolution in progress for this (name, mode); 
add to waiters
+            inFlight.get((name, mode)).foreach { waiters =>
+              inFlight = inFlight.updated((name, mode), sender() :: waiters)
+            }
+          } else if (resolvers.isEmpty) {
+            sender() ! Status.Failure(failToResolve(name, nameServers))
+          } else {
+            // spawn an actor to manage this resolution (apply search names, 
failover to other resolvers, etc.)
+            inFlight = inFlight.updated((name, mode), List(sender()))
+            context.actorOf(
+              DnsResolutionActor.props(settings, idGenerator, name, mode, 
self, resolvers))
           }
       }
-    }
-
-  private def sendQuestion(resolver: ActorRef, message: DnsQuestion): 
Future[Answer] = {
-    (resolver ? message).transformWith {
-      case Success(result: Answer) =>
-        Future.successful(result)
-      case Success(DuplicateId(_)) =>
-        sendQuestion(resolver, message.withId(idGenerator.nextId()))
-      case Failure(t) =>
-        resolver ! DropRequest(message)
-        Future.failed(t)
-      case Success(a) =>
-        resolver ! DropRequest(message)
-        Future.failed(
-          new IllegalArgumentException("Unexpected response " + a.toString + " 
of type " + a.getClass.toString))
-    }
-  }
 
-  private def resolveWithSearch(
-      name: String,
-      requestType: RequestType,
-      resolver: ActorRef): Future[DnsProtocol.Resolved] = {
-    if (settings.SearchDomains.nonEmpty) {
-      val nameWithSearch = settings.SearchDomains.map(sd => name + "." + sd)
-      // ndots is a heuristic used to try and work out whether the name passed 
in is a fully qualified domain name,
-      // or a name relative to one of the search names. The idea is to prevent 
the cost of doing a lookup that is
-      // obviously not going to resolve. So, if a host has less than ndots 
dots in it, then we don't try and resolve it,
-      // instead, we go directly to the search domains, or at least that's 
what the man page for resolv.conf says. In
-      // practice, Linux appears to implement something slightly different, if 
the name being searched contains less
-      // than ndots dots, then it should be searched last, rather than first. 
This means if the heuristic wrongly
-      // identifies a domain as being relative to the search domains, it will 
still be looked up if it doesn't resolve
-      // at any of the search domains, albeit with the latency of having to 
have done all the searches first.
-      val toResolve = if (name.count(_ == '.') >= settings.NDots) {
-        name :: nameWithSearch
-      } else {
-        nameWithSearch :+ name
-      }
-      resolveFirst(toResolve, requestType, resolver)
-    } else {
-      resolve(name, requestType, resolver)
-    }
-  }
-
-  private def resolveFirst(
-      searchNames: List[String],
-      requestType: RequestType,
-      resolver: ActorRef): Future[DnsProtocol.Resolved] = {
-    searchNames match {
-      case searchName :: Nil =>
-        resolve(searchName, requestType, resolver)
-      case searchName :: remaining =>
-        resolve(searchName, requestType, resolver).flatMap { resolved =>
-          if (resolved.records.isEmpty) resolveFirst(remaining, requestType, 
resolver)
-          else Future.successful(resolved)
+    case ResolutionAnswer(name, mode, result) =>
+      inFlight.get((name, mode)).foreach { waiters =>
+        result match {
+          case Success(resolved) =>
+            if (resolved.records.nonEmpty) {
+              val minTtl = (positiveCachePolicy +: 
resolved.records.map(_.ttl)).min
+              cache.put((name, mode), resolved, minTtl)
+            } else if (negativeCachePolicy != Never)
+              cache.put((name, mode), resolved, negativeCachePolicy)
+            log.debug("{} resolved {}", mode, resolved)
+            waiters.foreach(_ ! resolved)
+          case Failure(ex) =>
+            waiters.foreach(_ ! Status.Failure(ex))
         }
-      case Nil =>
-        // This can't happen
-        Future.failed(new IllegalStateException("Failed to 'resolveFirst': 
'searchNames' must not be empty"))
-    }
-  }
-
-  private def resolve(name: String, requestType: RequestType, resolver: 
ActorRef): Future[DnsProtocol.Resolved] = {
-    log.debug("Attempting to resolve {} with {}", name, resolver)
-    val caseFoldedName = Helpers.toRootLowerCase(name)
-    requestType match {
-      case Ip(ipv4, ipv6) =>
-        val ipv4Recs: Future[Answer] =
-          if (ipv4)
-            sendQuestion(resolver, Question4(idGenerator.nextId(), 
caseFoldedName))
-          else
-            Empty
-
-        val ipv6Recs =
-          if (ipv6)
-            sendQuestion(resolver, Question6(idGenerator.nextId(), 
caseFoldedName))
-          else
-            Empty
-
-        for {
-          ipv4 <- ipv4Recs
-          ipv6 <- ipv6Recs
-        } yield DnsProtocol.Resolved(name, ipv4.rrs ++ ipv6.rrs, 
ipv4.additionalRecs ++ ipv6.additionalRecs)
-
-      case Srv =>
-        sendQuestion(resolver, SrvQuestion(idGenerator.nextId(), 
caseFoldedName)).map(answer => {
-          DnsProtocol.Resolved(name, answer.rrs, answer.additionalRecs)
-        })
-    }
+      }
+      inFlight -= ((name, mode))
   }
 
 }
@@ -267,5 +175,164 @@ private[pekko] object AsyncDnsResolver {
   private val Empty =
     Future.successful(Answer(-1, immutable.Seq.empty[ResourceRecord], 
immutable.Seq.empty[ResourceRecord]))
 
+  private[pekko] def failToResolve(name: String, nameServers: 
List[InetSocketAddress]): ResolveFailedException =
+    ResolveFailedException(s"Failed to resolve $name with nameservers 
$nameServers")
+
   case class ResolveFailedException(msg: String) extends Exception(msg)
+
+  private final case class ResolutionAnswer(
+      name: String,
+      mode: RequestType,
+      result: Try[DnsProtocol.Resolved])
+
+  private object DnsResolutionActor {
+    def props(
+        settings: DnsSettings,
+        idGenerator: IdGenerator,
+        name: String,
+        mode: RequestType,
+        responseActor: ActorRef,
+        resolvers: List[ActorRef]): Props =
+      Props(new DnsResolutionActor(settings, idGenerator, name, mode, 
responseActor, resolvers))
+  }
+
+  // Per-request actor that manages DNS resolution: applies search domains, 
fails over to other resolvers.
+  // Reports the final result back to `responseActor` (the AsyncDnsResolver) 
via `ResolutionAnswer`.
+  private class DnsResolutionActor(
+      settings: DnsSettings,
+      idGenerator: IdGenerator,
+      name: String,
+      mode: RequestType,
+      responseActor: ActorRef,
+      resolvers: List[ActorRef])
+      extends Actor
+      with ActorLogging {
+
+    private implicit val timeout: Timeout = Timeout(settings.ResolveTimeout)
+    private implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+    private def failToResolve(): Unit = {
+      responseActor ! ResolutionAnswer(name, mode, 
Failure(AsyncDnsResolver.failToResolve(name, settings.NameServers)))
+      context.stop(self)
+    }
+
+    // Not strictly necessary, since our parent checks this before spawning, 
but belt-and-suspenders
+    if (resolvers.isEmpty) {
+      failToResolve() // (vacuously) exhausted the resolvers
+      // fail fast
+      throw new IllegalArgumentException("resolvers should not be empty")
+    }
+
+    // the fully-qualified domain names (FQDNs), in the order we want to 
attempt for any given resolver
+    private val namesToResolve = {
+      val nameWithSearch = settings.SearchDomains.map(sd => s"${name}.${sd}")
+
+      // ndots is a heuristic used to attempt to work out if `name` is an FQDN 
or a name relative to a search
+      // name. The goal is to not incur the cost of a lookup that's unlikely 
to resolve because we need to
+      // relativize it. If the name being searched contains less than ndots 
dots, then we look it up last,
+      // rather than first.
+      if (name.count(_ == '.') >= settings.NDots) {
+        // assume fully-qualified, so try `name` first
+        (name :: nameWithSearch).map(Helpers.toRootLowerCase)
+      } else {
+        // assume relative, so try `name` last
+        (nameWithSearch :+ name).map(Helpers.toRootLowerCase)
+      }
+    }
+
+    // Initial receive is empty; startResolution immediately calls 
context.become to switch to activelyResolving
+    override def receive: Receive = PartialFunction.empty
+    // safe, already verified that resolvers is non-empty
+    startResolution(namesToResolve, resolvers.head, resolvers.tail)
+
+    private def activelyResolving(
+        searchName: String,
+        resolver: ActorRef,
+        nextNamesToTry: List[String],
+        nextResolversToTry: List[ActorRef]): Receive = {
+      case resolved: DnsProtocol.Resolved =>
+        if (resolved.records.isEmpty) {
+          if (nextNamesToTry.nonEmpty) startResolution(nextNamesToTry, 
resolver, nextResolversToTry)
+          else handleResolved(resolved) // empty but successful response
+        } else {
+          handleResolved(resolved)
+        }
+
+      case Status.Failure(ex) =>
+        ex match {
+          case _: AskTimeoutException =>
+            log.info("Resolve of {} timed out after {}. Trying next name 
server", searchName, timeout.duration.pretty)
+          case _ =>
+            log.info("Resolve of {} failed. Trying next name server. 
Exception: {}", name, ex.getMessage)
+        }
+
+        // failed, move on to next name server
+        if (nextResolversToTry.nonEmpty)
+          startResolution(namesToResolve, nextResolversToTry.head, 
nextResolversToTry.tail)
+        else failToResolve() // exhausted the resolvers
+    }
+
+    private def startResolution(
+        namesToTry: List[String],
+        resolverToUse: ActorRef,
+        fallbackResolvers: List[ActorRef]): Unit = {
+      if (namesToTry.nonEmpty) {
+        val searchName = namesToTry.head
+
+        log.debug("Attempting to resolve {} with {}", searchName, 
resolverToUse)
+
+        resolvedFut(searchName, resolverToUse).pipeTo(self)
+
+        context.become(activelyResolving(searchName, resolverToUse, 
namesToTry.tail, fallbackResolvers))
+      } else {
+        // shouldn't happen
+        log.warning("startResolution called with empty list of namesToTry")
+        failToResolve()
+      }
+    }
+
+    private def handleResolved(resolved: DnsProtocol.Resolved): Unit = {
+      responseActor ! ResolutionAnswer(name, mode, Success(resolved))
+      context.stop(self)
+    }
+
+    private def sendQuestion(resolver: ActorRef, message: DnsQuestion): 
Future[Answer] = {
+      (resolver ? message).transformWith {
+        case Success(result: Answer) =>
+          Future.successful(result)
+        case Success(DuplicateId(_)) =>
+          sendQuestion(resolver, message.withId(idGenerator.nextId()))
+        case Failure(t) =>
+          resolver ! DropRequest(message)
+          Future.failed(t)
+        case Success(a) =>
+          resolver ! DropRequest(message)
+          Future.failed(
+            new IllegalArgumentException("Unexpected response " + a.toString + 
" of type " + a.getClass.toString))
+      }
+    }
+
+    private def resolvedFut(searchName: String, resolver: ActorRef): 
Future[DnsProtocol.Resolved] =
+      mode match {
+        case Ip(ipv4, ipv6) =>
+          val ipv4Recs =
+            if (ipv4) sendQuestion(resolver, Question4(idGenerator.nextId(), 
searchName))
+            else Empty
+
+          val ipv6Recs =
+            if (ipv6) sendQuestion(resolver, Question6(idGenerator.nextId(), 
searchName))
+            else Empty
+
+          ipv4Recs.flatMap { v4 =>
+            ipv6Recs.map { v6 =>
+              DnsProtocol.Resolved(searchName, v4.rrs ++ v6.rrs, 
v4.additionalRecs ++ v6.additionalRecs)
+            }(parasitic)
+          }(parasitic)
+
+        case Srv =>
+          sendQuestion(resolver, SrvQuestion(idGenerator.nextId(), 
searchName)).map { answer =>
+            DnsProtocol.Resolved(searchName, answer.rrs, answer.additionalRecs)
+          }(parasitic)
+      }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to