Copilot commented on code in PR #546:
URL: https://github.com/apache/pekko-management/pull/546#discussion_r2538733844


##########
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala:
##########
@@ -66,20 +67,32 @@ import scala.util.control.NonFatal
 
   private lazy val clientSslContext: HttpsConnectionContext = 
ConnectionContext.httpsClient(sslContext)
 
-  protected val namespace: String =
-    
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath, 
"namespace")).getOrElse("default")
+  protected val namespace: Future[String] = {
+    settings.namespace match {
+      case Some(nSpace) => Future.successful(nSpace)
+      case _            =>
+        readConfigVarFromFilesystem(settings.namespacePath, 
"namespace").map(_.getOrElse("default"))(
+          ExecutionContext.parasitic)
+    }
+  }
 
   protected val scheme: String = if (settings.secure) "https" else "http"
-  private lazy val apiToken = 
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
-  private lazy val headers = if (settings.secure) 
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+  private[pekko] def apiToken() = 
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").map(
+    _.getOrElse(""))(ExecutionContext.parasitic)
+  private def headers() = if (settings.secure) {
+    apiToken().map { token =>
+      immutable.Seq(Authorization(OAuth2BearerToken(token)))
+    }(ExecutionContext.parasitic)
+  } else
+    Future.successful(Nil)
 
   log.debug("kubernetes access namespace: {}. Secure: {}", namespace, 
settings.secure)

Review Comment:
   The `namespace` field is now a `Future[String]`, but it's being logged 
directly. This will log the Future object representation (e.g., `Future(<not 
completed>)`) instead of the actual namespace value. This logging should either 
be removed or wrapped in a callback that logs the value once the Future 
completes.
   ```suggestion
     namespace.foreach { ns =>
       log.debug("kubernetes access namespace: {}. Secure: {}", ns, 
settings.secure)
     }(system.dispatcher)
   ```



##########
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala:
##########
@@ -148,17 +160,34 @@ import scala.util.control.NonFatal
   protected def requestForPath(
       path: Uri.Path,
       method: HttpMethod = HttpMethods.GET,
-      entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+      entity: RequestEntity = HttpEntity.Empty): Future[HttpRequest] = {
     val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port = 
settings.apiServerPort).withPath(path)
-    HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+    headers().map { headers =>
+      HttpRequest(uri = uri, headers = headers, method = method, entity = 
entity)
+    }(ExecutionContext.parasitic)
+  }
+
+  private[pekko] def makeRawRequest(request: HttpRequest): 
Future[HttpResponse] = {
+    if (settings.secure)
+      http.singleRequest(request, clientSslContext)
+    else
+      http.singleRequest(request)
   }
 
   protected def makeRequest(request: HttpRequest, timeoutMsg: String): 
Future[HttpResponse] = {
-    val response =
-      if (settings.secure)
-        http.singleRequest(request, clientSslContext)
-      else
-        http.singleRequest(request)
+    // It's possible to legitimately get a 401 response due to kubernetes 
doing a token rotation
+    implicit val scheduler: Scheduler = system.scheduler
+    val response = RetrySupport.retry(
+      () => makeRawRequest(request: HttpRequest),
+      (response: HttpResponse, _: Throwable) => {
+        log.warning("Received status code 401 as response, retrying due to 
possible token rotation")
+        response.status == StatusCodes.Unauthorized
+      },
+      settings.tokenRetrySettings.attempts,
+      settings.tokenRetrySettings.minBackoff,
+      settings.tokenRetrySettings.maxBackoff,
+      settings.tokenRetrySettings.randomFactor
+    )

Review Comment:
   The retry logic retries with the same `HttpRequest` object that was built 
with the old token. When a 401 occurs due to token rotation, retrying with the 
same request (containing the old token in headers) will keep failing. The 
request should be rebuilt on each retry by calling `requestForPath` again to 
fetch the fresh token. Consider restructuring to pass a function that builds 
the request instead of the request itself.



##########
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala:
##########
@@ -148,17 +160,34 @@ import scala.util.control.NonFatal
   protected def requestForPath(
       path: Uri.Path,
       method: HttpMethod = HttpMethods.GET,
-      entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+      entity: RequestEntity = HttpEntity.Empty): Future[HttpRequest] = {
     val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port = 
settings.apiServerPort).withPath(path)
-    HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+    headers().map { headers =>
+      HttpRequest(uri = uri, headers = headers, method = method, entity = 
entity)
+    }(ExecutionContext.parasitic)
+  }
+
+  private[pekko] def makeRawRequest(request: HttpRequest): 
Future[HttpResponse] = {
+    if (settings.secure)
+      http.singleRequest(request, clientSslContext)
+    else
+      http.singleRequest(request)
   }
 
   protected def makeRequest(request: HttpRequest, timeoutMsg: String): 
Future[HttpResponse] = {
-    val response =
-      if (settings.secure)
-        http.singleRequest(request, clientSslContext)
-      else
-        http.singleRequest(request)
+    // It's possible to legitimately get a 401 response due to kubernetes 
doing a token rotation
+    implicit val scheduler: Scheduler = system.scheduler
+    val response = RetrySupport.retry(
+      () => makeRawRequest(request: HttpRequest),
+      (response: HttpResponse, _: Throwable) => {

Review Comment:
   When retrying on a 401 response, the response body is not being consumed 
before retrying. HTTP response entities should be drained using 
`response.discardEntityBytes()` to prevent resource leaks and connection pool 
issues. Add `response.discardEntityBytes()` before the retry condition check.
   ```suggestion
         (response: HttpResponse, _: Throwable) => {
           response.discardEntityBytes()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to