This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 579e2de947 Port akka-core PR #31906 and #31926: DnsResolutionActor and
in-flight deduplication (#2919)
579e2de947 is described below
commit 579e2de947385da4ed6bca4e6b98e031385ce765
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 10:30:33 2026 +0200
Port akka-core PR #31906 and #31926: DnsResolutionActor and in-flight
deduplication (#2919)
* feat: port akka-core PR #31906 and #31926 - DnsResolutionActor and
in-flight dedup
- Replace future-based resolution in AsyncDnsResolver with per-request
DnsResolutionActor actors (ported from akka-core PR #31906)
- Add in-flight deduplication: concurrent resolves for same (name, mode)
share a single DNS lookup
- DnsResolutionActor handles search domains, nameserver failover, and
DuplicateId retries using pekko's existing IdGenerator
- ResolutionAnswer message routes results through AsyncDnsResolver
so it can update cache and notify all waiting senders
- Port three new tests from akka-core PR #31926:
- 'attempt to drop a failed question on timeout'
- 'not reuse the request ids of pending requests'
- 'reuse in-progress resolutions'
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/dc5393cc-d827-4840-85f5-07f67c5142ac
Co-authored-by: pjfanning <[email protected]>
* refactor: address code review - fix double map lookup, improve comment
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/dc5393cc-d827-4840-85f5-07f67c5142ac
Co-authored-by: pjfanning <[email protected]>
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../io/dns/internal/AsyncDnsResolverSpec.scala | 55 +++-
.../pekko/io/dns/internal/AsyncDnsResolver.scala | 347 ++++++++++++---------
2 files changed, 261 insertions(+), 141 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
index 51b51d88eb..a3dfb031ac 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
@@ -26,7 +26,7 @@ import pekko.io.dns.{ AAAARecord, ARecord, DnsSettings,
IdGenerator, SRVRecord }
import pekko.io.dns.CachePolicy.Ttl
import pekko.io.dns.DnsProtocol._
import pekko.io.dns.internal.AsyncDnsResolver.ResolveFailedException
-import pekko.io.dns.internal.DnsClient.{ Answer, DuplicateId, Question4,
Question6, SrvQuestion }
+import pekko.io.dns.internal.DnsClient.{ Answer, DropRequest, DuplicateId,
Question4, Question6, SrvQuestion }
import pekko.testkit.{ PekkoSpec, TestProbe, WithLogCapturing }
import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory }
@@ -300,6 +300,59 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
senderProbe.expectMsg(Resolved("cats.com", im.Seq(ipv4Record)))
}
+
+ "attempt to drop a failed question on timeout" in new Setup {
+ val configWithExtraShortTimeout =
+ defaultConfig.withValue("resolve-timeout",
ConfigValueFactory.fromAnyRef("1 ms"))
+ override val r = resolver(List(dnsClient1.ref),
configWithExtraShortTimeout)
+
+ r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
+ val q4 = dnsClient1.expectMsgPF() {
+ case q: Question4 if q.name == "cats.com" => q
+ }
+ // ask times out because no reply; the DnsResolutionActor should drop
the pending question
+ dnsClient1.expectMsgPF(remainingOrDefault) {
+ case DropRequest(dropped) if dropped == q4 =>
+ }
+ }
+
+ "not reuse the request ids of pending requests" in new Setup {
+ // Send multiple resolves for different names so no in-flight
deduplication applies
+ val resolveCount = 10
+ (1 to resolveCount).foreach { i =>
+ r.tell(Resolve(s"host$i.cats.com", Ip(ipv4 = true, ipv6 = false)),
senderProbe.ref)
+ }
+
+ // Each resolve should have received a Question4 from dnsClient1 with a
unique ID
+ val receivedIds = (1 to resolveCount).map { _ =>
+ dnsClient1.expectMsgPF(remainingOrDefault) {
+ case Question4(id, _) => id
+ }
+ }
+ receivedIds.toSet.size shouldBe resolveCount
+ }
+
+ "reuse in-progress resolutions" in new Setup {
+ val asker1 = TestProbe()
+ val asker2 = TestProbe()
+
+ override val r = resolver(List(dnsClient1.ref), defaultConfig)
+
+ val resolve = Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
+
+ r.tell(resolve, asker1.ref)
+ val firstId = dnsClient1.expectMsgPF() {
+ case q4: Question4 if q4.name == "cats.com" => q4.id
+ }
+ // Send a second identical resolve while the first is still pending
+ r.tell(resolve, asker2.ref)
+ // No second DNS question should be sent; the second resolve reuses the
in-progress one
+ dnsClient1.expectNoMessage(50.millis)
+ dnsClient1.reply(Answer(firstId, Nil))
+
+ asker1.expectMsg(Resolved("cats.com", im.Seq.empty))
+ asker2.expectMsg(Resolved("cats.com", im.Seq.empty))
+ }
}
def resolver(clients: List[ActorRef], config: Config): ActorRef = {
diff --git
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
index aced67214c..7804a74784 100644
---
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
+++
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
@@ -16,15 +16,12 @@ package org.apache.pekko.io.dns.internal
import java.net.{ Inet4Address, Inet6Address, InetAddress, InetSocketAddress }
import scala.collection.immutable
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
-import scala.util.control.NonFatal
+import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.concurrent.ExecutionContext.parasitic
+import scala.util.{ Failure, Success, Try }
import org.apache.pekko
-import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory }
+import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory, Props,
Status }
import pekko.annotation.InternalApi
import pekko.io.SimpleDnsCache
import pekko.io.dns._
@@ -56,8 +53,6 @@ private[io] final class AsyncDnsResolver(
import AsyncDnsResolver._
- implicit val ec: ExecutionContextExecutor = context.dispatcher
-
// avoid ever looking up localhost by pre-populating cache
{
val loopback = InetAddress.getLoopbackAddress
@@ -81,9 +76,6 @@ private[io] final class AsyncDnsResolver(
}
- // For ask to DNS Client
- implicit val timeout: Timeout = Timeout(settings.ResolveTimeout)
-
val nameServers = settings.NameServers
val positiveCachePolicy = settings.PositiveCachePolicy
@@ -96,6 +88,9 @@ private[io] final class AsyncDnsResolver(
private val resolvers: List[ActorRef] = clientFactory(context, nameServers)
+ // tracks in-flight resolutions by (name, requestType) -> list of senders
waiting for the result
+ private var inFlight: Map[(String, RequestType), List[ActorRef]] = Map.empty
+
// only supports DnsProtocol, not the deprecated Dns protocol
// AsyncDnsManager converts between the protocols to support the deprecated
protocol
override def receive: Receive = {
@@ -105,139 +100,52 @@ private[io] final class AsyncDnsResolver(
log.debug("{} cached {}", mode, resolved)
sender() ! resolved
case None =>
- resolveWithResolvers(name, mode, resolvers)
- .map { resolved =>
- if (resolved.records.nonEmpty) {
- val minTtl = (positiveCachePolicy +:
resolved.records.map(_.ttl)).min
- cache.put((name, mode), resolved, minTtl)
- } else if (negativeCachePolicy != Never) cache.put((name, mode),
resolved, negativeCachePolicy)
- log.debug(s"{} resolved {}", mode, resolved)
- resolved
- }
- .pipeTo(sender())
- }
- }
-
- private def resolveWithResolvers(
- name: String,
- requestType: RequestType,
- resolvers: List[ActorRef]): Future[DnsProtocol.Resolved] =
- if (isInetAddress(name)) {
- Future.fromTry {
- Try {
- val address = InetAddress.getByName(name) // only checks validity,
since known to be IP address
- val record = address match {
- case _: Inet4Address => ARecord(name,
Ttl.effectivelyForever, address)
- case ipv6address: Inet6Address => AAAARecord(name,
Ttl.effectivelyForever, ipv6address)
- case unexpected => throw new
IllegalArgumentException(s"Unexpected address: $unexpected")
- }
- DnsProtocol.Resolved(name, record :: Nil)
- }
- }
- } else {
- resolvers match {
- case Nil =>
- Future.failed(ResolveFailedException(s"Failed to resolve $name with
nameservers: $nameServers"))
- case head :: tail =>
- resolveWithSearch(name, requestType, head).recoverWith {
- case NonFatal(t) =>
- t match {
- case _: AskTimeoutException =>
- log.info("Resolve of {} timed out after {}. Trying next name
server", name, timeout.duration.pretty)
- case _ =>
- log.info("Resolve of {} failed. Trying next name server {}",
name, t.getMessage)
+ // check if we're being asked to resolve an IP address, in which
case no need to do anything async
+ if (isInetAddress(name)) {
+ Try {
+ InetAddress.getByName(name) match { // only a validity check,
doesn't do blocking I/O
+ case ipv4: Inet4Address => ARecord(name,
Ttl.effectivelyForever, ipv4)
+ case ipv6: Inet6Address => AAAARecord(name,
Ttl.effectivelyForever, ipv6)
+ case unexpected => throw new
IllegalArgumentException(s"Unexpected address: $unexpected")
}
- resolveWithResolvers(name, requestType, tail)
+ }.fold(
+ ex => { sender() ! Status.Failure(ex) },
+ record => {
+ val resolved = DnsProtocol.Resolved(name, record :: Nil)
+ cache.put(name -> mode, resolved, record.ttl)
+ sender() ! resolved
+ })
+ } else if (inFlight.contains((name, mode))) {
+ // there's already a resolution in progress for this (name, mode);
add to waiters
+ inFlight.get((name, mode)).foreach { waiters =>
+ inFlight = inFlight.updated((name, mode), sender() :: waiters)
+ }
+ } else if (resolvers.isEmpty) {
+ sender() ! Status.Failure(failToResolve(name, nameServers))
+ } else {
+ // spawn an actor to manage this resolution (apply search names,
failover to other resolvers, etc.)
+ inFlight = inFlight.updated((name, mode), List(sender()))
+ context.actorOf(
+ DnsResolutionActor.props(settings, idGenerator, name, mode,
self, resolvers))
}
}
- }
-
- private def sendQuestion(resolver: ActorRef, message: DnsQuestion):
Future[Answer] = {
- (resolver ? message).transformWith {
- case Success(result: Answer) =>
- Future.successful(result)
- case Success(DuplicateId(_)) =>
- sendQuestion(resolver, message.withId(idGenerator.nextId()))
- case Failure(t) =>
- resolver ! DropRequest(message)
- Future.failed(t)
- case Success(a) =>
- resolver ! DropRequest(message)
- Future.failed(
- new IllegalArgumentException("Unexpected response " + a.toString + "
of type " + a.getClass.toString))
- }
- }
- private def resolveWithSearch(
- name: String,
- requestType: RequestType,
- resolver: ActorRef): Future[DnsProtocol.Resolved] = {
- if (settings.SearchDomains.nonEmpty) {
- val nameWithSearch = settings.SearchDomains.map(sd => name + "." + sd)
- // ndots is a heuristic used to try and work out whether the name passed
in is a fully qualified domain name,
- // or a name relative to one of the search names. The idea is to prevent
the cost of doing a lookup that is
- // obviously not going to resolve. So, if a host has less than ndots
dots in it, then we don't try and resolve it,
- // instead, we go directly to the search domains, or at least that's
what the man page for resolv.conf says. In
- // practice, Linux appears to implement something slightly different, if
the name being searched contains less
- // than ndots dots, then it should be searched last, rather than first.
This means if the heuristic wrongly
- // identifies a domain as being relative to the search domains, it will
still be looked up if it doesn't resolve
- // at any of the search domains, albeit with the latency of having to
have done all the searches first.
- val toResolve = if (name.count(_ == '.') >= settings.NDots) {
- name :: nameWithSearch
- } else {
- nameWithSearch :+ name
- }
- resolveFirst(toResolve, requestType, resolver)
- } else {
- resolve(name, requestType, resolver)
- }
- }
-
- private def resolveFirst(
- searchNames: List[String],
- requestType: RequestType,
- resolver: ActorRef): Future[DnsProtocol.Resolved] = {
- searchNames match {
- case searchName :: Nil =>
- resolve(searchName, requestType, resolver)
- case searchName :: remaining =>
- resolve(searchName, requestType, resolver).flatMap { resolved =>
- if (resolved.records.isEmpty) resolveFirst(remaining, requestType,
resolver)
- else Future.successful(resolved)
+ case ResolutionAnswer(name, mode, result) =>
+ inFlight.get((name, mode)).foreach { waiters =>
+ result match {
+ case Success(resolved) =>
+ if (resolved.records.nonEmpty) {
+ val minTtl = (positiveCachePolicy +:
resolved.records.map(_.ttl)).min
+ cache.put((name, mode), resolved, minTtl)
+ } else if (negativeCachePolicy != Never)
+ cache.put((name, mode), resolved, negativeCachePolicy)
+ log.debug("{} resolved {}", mode, resolved)
+ waiters.foreach(_ ! resolved)
+ case Failure(ex) =>
+ waiters.foreach(_ ! Status.Failure(ex))
}
- case Nil =>
- // This can't happen
- Future.failed(new IllegalStateException("Failed to 'resolveFirst':
'searchNames' must not be empty"))
- }
- }
-
- private def resolve(name: String, requestType: RequestType, resolver:
ActorRef): Future[DnsProtocol.Resolved] = {
- log.debug("Attempting to resolve {} with {}", name, resolver)
- val caseFoldedName = Helpers.toRootLowerCase(name)
- requestType match {
- case Ip(ipv4, ipv6) =>
- val ipv4Recs: Future[Answer] =
- if (ipv4)
- sendQuestion(resolver, Question4(idGenerator.nextId(),
caseFoldedName))
- else
- Empty
-
- val ipv6Recs =
- if (ipv6)
- sendQuestion(resolver, Question6(idGenerator.nextId(),
caseFoldedName))
- else
- Empty
-
- for {
- ipv4 <- ipv4Recs
- ipv6 <- ipv6Recs
- } yield DnsProtocol.Resolved(name, ipv4.rrs ++ ipv6.rrs,
ipv4.additionalRecs ++ ipv6.additionalRecs)
-
- case Srv =>
- sendQuestion(resolver, SrvQuestion(idGenerator.nextId(),
caseFoldedName)).map(answer => {
- DnsProtocol.Resolved(name, answer.rrs, answer.additionalRecs)
- })
- }
+ }
+ inFlight -= ((name, mode))
}
}
@@ -267,5 +175,164 @@ private[pekko] object AsyncDnsResolver {
private val Empty =
Future.successful(Answer(-1, immutable.Seq.empty[ResourceRecord],
immutable.Seq.empty[ResourceRecord]))
+ private[pekko] def failToResolve(name: String, nameServers:
List[InetSocketAddress]): ResolveFailedException =
+ ResolveFailedException(s"Failed to resolve $name with nameservers
$nameServers")
+
case class ResolveFailedException(msg: String) extends Exception(msg)
+
+ private final case class ResolutionAnswer(
+ name: String,
+ mode: RequestType,
+ result: Try[DnsProtocol.Resolved])
+
+ private object DnsResolutionActor {
+ def props(
+ settings: DnsSettings,
+ idGenerator: IdGenerator,
+ name: String,
+ mode: RequestType,
+ responseActor: ActorRef,
+ resolvers: List[ActorRef]): Props =
+ Props(new DnsResolutionActor(settings, idGenerator, name, mode,
responseActor, resolvers))
+ }
+
+ // Per-request actor that manages DNS resolution: applies search domains,
fails over to other resolvers.
+ // Reports the final result back to `responseActor` (the AsyncDnsResolver)
via `ResolutionAnswer`.
+ private class DnsResolutionActor(
+ settings: DnsSettings,
+ idGenerator: IdGenerator,
+ name: String,
+ mode: RequestType,
+ responseActor: ActorRef,
+ resolvers: List[ActorRef])
+ extends Actor
+ with ActorLogging {
+
+ private implicit val timeout: Timeout = Timeout(settings.ResolveTimeout)
+ private implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+ private def failToResolve(): Unit = {
+ responseActor ! ResolutionAnswer(name, mode,
Failure(AsyncDnsResolver.failToResolve(name, settings.NameServers)))
+ context.stop(self)
+ }
+
+ // Not strictly necessary, since our parent checks this before spawning,
but belt-and-suspenders
+ if (resolvers.isEmpty) {
+ failToResolve() // (vacuously) exhausted the resolvers
+ // fail fast
+ throw new IllegalArgumentException("resolvers should not be empty")
+ }
+
+ // the fully-qualified domain names (FQDNs), in the order we want to
attempt for any given resolver
+ private val namesToResolve = {
+ val nameWithSearch = settings.SearchDomains.map(sd => s"${name}.${sd}")
+
+ // ndots is a heuristic used to attempt to work out if `name` is an FQDN
or a name relative to a search
+ // name. The goal is to not incur the cost of a lookup that's unlikely
to resolve because we need to
+ // relativize it. If the name being searched contains less than ndots
dots, then we look it up last,
+ // rather than first.
+ if (name.count(_ == '.') >= settings.NDots) {
+ // assume fully-qualified, so try `name` first
+ (name :: nameWithSearch).map(Helpers.toRootLowerCase)
+ } else {
+ // assume relative, so try `name` last
+ (nameWithSearch :+ name).map(Helpers.toRootLowerCase)
+ }
+ }
+
+ // Initial receive is empty; startResolution immediately calls
context.become to switch to activelyResolving
+ override def receive: Receive = PartialFunction.empty
+ // safe, already verified that resolvers is non-empty
+ startResolution(namesToResolve, resolvers.head, resolvers.tail)
+
+ private def activelyResolving(
+ searchName: String,
+ resolver: ActorRef,
+ nextNamesToTry: List[String],
+ nextResolversToTry: List[ActorRef]): Receive = {
+ case resolved: DnsProtocol.Resolved =>
+ if (resolved.records.isEmpty) {
+ if (nextNamesToTry.nonEmpty) startResolution(nextNamesToTry,
resolver, nextResolversToTry)
+ else handleResolved(resolved) // empty but successful response
+ } else {
+ handleResolved(resolved)
+ }
+
+ case Status.Failure(ex) =>
+ ex match {
+ case _: AskTimeoutException =>
+ log.info("Resolve of {} timed out after {}. Trying next name
server", searchName, timeout.duration.pretty)
+ case _ =>
+ log.info("Resolve of {} failed. Trying next name server.
Exception: {}", name, ex.getMessage)
+ }
+
+ // failed, move on to next name server
+ if (nextResolversToTry.nonEmpty)
+ startResolution(namesToResolve, nextResolversToTry.head,
nextResolversToTry.tail)
+ else failToResolve() // exhausted the resolvers
+ }
+
+ private def startResolution(
+ namesToTry: List[String],
+ resolverToUse: ActorRef,
+ fallbackResolvers: List[ActorRef]): Unit = {
+ if (namesToTry.nonEmpty) {
+ val searchName = namesToTry.head
+
+ log.debug("Attempting to resolve {} with {}", searchName,
resolverToUse)
+
+ resolvedFut(searchName, resolverToUse).pipeTo(self)
+
+ context.become(activelyResolving(searchName, resolverToUse,
namesToTry.tail, fallbackResolvers))
+ } else {
+ // shouldn't happen
+ log.warning("startResolution called with empty list of namesToTry")
+ failToResolve()
+ }
+ }
+
+ private def handleResolved(resolved: DnsProtocol.Resolved): Unit = {
+ responseActor ! ResolutionAnswer(name, mode, Success(resolved))
+ context.stop(self)
+ }
+
+ private def sendQuestion(resolver: ActorRef, message: DnsQuestion):
Future[Answer] = {
+ (resolver ? message).transformWith {
+ case Success(result: Answer) =>
+ Future.successful(result)
+ case Success(DuplicateId(_)) =>
+ sendQuestion(resolver, message.withId(idGenerator.nextId()))
+ case Failure(t) =>
+ resolver ! DropRequest(message)
+ Future.failed(t)
+ case Success(a) =>
+ resolver ! DropRequest(message)
+ Future.failed(
+ new IllegalArgumentException("Unexpected response " + a.toString +
" of type " + a.getClass.toString))
+ }
+ }
+
+ private def resolvedFut(searchName: String, resolver: ActorRef):
Future[DnsProtocol.Resolved] =
+ mode match {
+ case Ip(ipv4, ipv6) =>
+ val ipv4Recs =
+ if (ipv4) sendQuestion(resolver, Question4(idGenerator.nextId(),
searchName))
+ else Empty
+
+ val ipv6Recs =
+ if (ipv6) sendQuestion(resolver, Question6(idGenerator.nextId(),
searchName))
+ else Empty
+
+ ipv4Recs.flatMap { v4 =>
+ ipv6Recs.map { v6 =>
+ DnsProtocol.Resolved(searchName, v4.rrs ++ v6.rrs,
v4.additionalRecs ++ v6.additionalRecs)
+ }(parasitic)
+ }(parasitic)
+
+ case Srv =>
+ sendQuestion(resolver, SrvQuestion(idGenerator.nextId(),
searchName)).map { answer =>
+ DnsProtocol.Resolved(searchName, answer.rrs, answer.additionalRecs)
+ }(parasitic)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]