Copilot commented on code in PR #546:
URL: https://github.com/apache/pekko-management/pull/546#discussion_r2538733844
##########
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala:
##########
@@ -66,20 +67,32 @@ import scala.util.control.NonFatal
private lazy val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(sslContext)
- protected val namespace: String =
-
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath,
"namespace")).getOrElse("default")
+ protected val namespace: Future[String] = {
+ settings.namespace match {
+ case Some(nSpace) => Future.successful(nSpace)
+ case _ =>
+ readConfigVarFromFilesystem(settings.namespacePath,
"namespace").map(_.getOrElse("default"))(
+ ExecutionContext.parasitic)
+ }
+ }
protected val scheme: String = if (settings.secure) "https" else "http"
- private lazy val apiToken =
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
- private lazy val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+ private[pekko] def apiToken() =
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").map(
+ _.getOrElse(""))(ExecutionContext.parasitic)
+ private def headers() = if (settings.secure) {
+ apiToken().map { token =>
+ immutable.Seq(Authorization(OAuth2BearerToken(token)))
+ }(ExecutionContext.parasitic)
+ } else
+ Future.successful(Nil)
log.debug("kubernetes access namespace: {}. Secure: {}", namespace,
settings.secure)
Review Comment:
The `namespace` field is now a `Future[String]`, but it's being logged
directly. This will log the Future object representation (e.g., `Future(<not
completed>)`) instead of the actual namespace value. This logging should either
be removed or wrapped in a callback that logs the value once the Future
completes.
```suggestion
namespace.foreach { ns =>
log.debug("kubernetes access namespace: {}. Secure: {}", ns,
settings.secure)
}(system.dispatcher)
```
##########
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala:
##########
@@ -148,17 +160,34 @@ import scala.util.control.NonFatal
protected def requestForPath(
path: Uri.Path,
method: HttpMethod = HttpMethods.GET,
- entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+ entity: RequestEntity = HttpEntity.Empty): Future[HttpRequest] = {
val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port =
settings.apiServerPort).withPath(path)
- HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+ headers().map { headers =>
+ HttpRequest(uri = uri, headers = headers, method = method, entity =
entity)
+ }(ExecutionContext.parasitic)
+ }
+
+ private[pekko] def makeRawRequest(request: HttpRequest):
Future[HttpResponse] = {
+ if (settings.secure)
+ http.singleRequest(request, clientSslContext)
+ else
+ http.singleRequest(request)
}
protected def makeRequest(request: HttpRequest, timeoutMsg: String):
Future[HttpResponse] = {
- val response =
- if (settings.secure)
- http.singleRequest(request, clientSslContext)
- else
- http.singleRequest(request)
+ // It's possible to legitimately get a 401 response due to kubernetes
doing a token rotation
+ implicit val scheduler: Scheduler = system.scheduler
+ val response = RetrySupport.retry(
+ () => makeRawRequest(request: HttpRequest),
+ (response: HttpResponse, _: Throwable) => {
+ log.warning("Received status code 401 as response, retrying due to
possible token rotation")
+ response.status == StatusCodes.Unauthorized
+ },
+ settings.tokenRetrySettings.attempts,
+ settings.tokenRetrySettings.minBackoff,
+ settings.tokenRetrySettings.maxBackoff,
+ settings.tokenRetrySettings.randomFactor
+ )
Review Comment:
The retry logic retries with the same `HttpRequest` object that was built
with the old token. When a 401 occurs due to token rotation, retrying with the
same request (containing the old token in headers) will keep failing. The
request should be rebuilt on each retry by calling `requestForPath` again to
fetch the fresh token. Consider restructuring to pass a function that builds
the request instead of the request itself.
##########
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala:
##########
@@ -148,17 +160,34 @@ import scala.util.control.NonFatal
protected def requestForPath(
path: Uri.Path,
method: HttpMethod = HttpMethods.GET,
- entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+ entity: RequestEntity = HttpEntity.Empty): Future[HttpRequest] = {
val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port =
settings.apiServerPort).withPath(path)
- HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+ headers().map { headers =>
+ HttpRequest(uri = uri, headers = headers, method = method, entity =
entity)
+ }(ExecutionContext.parasitic)
+ }
+
+ private[pekko] def makeRawRequest(request: HttpRequest):
Future[HttpResponse] = {
+ if (settings.secure)
+ http.singleRequest(request, clientSslContext)
+ else
+ http.singleRequest(request)
}
protected def makeRequest(request: HttpRequest, timeoutMsg: String):
Future[HttpResponse] = {
- val response =
- if (settings.secure)
- http.singleRequest(request, clientSslContext)
- else
- http.singleRequest(request)
+ // It's possible to legitimately get a 401 response due to kubernetes
doing a token rotation
+ implicit val scheduler: Scheduler = system.scheduler
+ val response = RetrySupport.retry(
+ () => makeRawRequest(request: HttpRequest),
+ (response: HttpResponse, _: Throwable) => {
Review Comment:
When retrying on a 401 response, the response body is not being consumed
before retrying. HTTP response entities should be drained using
`response.discardEntityBytes()` to prevent resource leaks and connection pool
issues. Add `response.discardEntityBytes()` before the retry condition check.
```suggestion
(response: HttpResponse, _: Throwable) => {
response.discardEntityBytes()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]