parthchandra commented on code in PR #53840:
URL: https://github.com/apache/spark/pull/53840#discussion_r2825786599


##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##########
@@ -459,32 +462,53 @@ class ExecutorPodsAllocator(
         .build()
       val resources = replacePVCsIfNeeded(
         podWithAttachedContainer, 
resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
-      val createdExecutorPod =
-        
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
-      try {
-        addOwnerReference(createdExecutorPod, resources)
-        resources
-          .filter(_.getKind == "PersistentVolumeClaim")
-          .foreach { resource =>
-            if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
-              addOwnerReference(driverPod.get, Seq(resource))
-            }
-            val pvc = resource.asInstanceOf[PersistentVolumeClaim]
-            logInfo(log"Trying to create PersistentVolumeClaim " +
-              log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} 
with " +
-              log"StorageClass ${MDC(LogKeys.CLASS_NAME, 
pvc.getSpec.getStorageClassName)}")
-            
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
-            PVC_COUNTER.incrementAndGet()
-          }
-        newlyCreatedExecutors(newExecutorId) = (resourceProfileId, 
clock.getTimeMillis())
-        logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+      val optCreatedExecutorPod = try {
+        Some(kubernetesClient
+          .pods()
+          .inNamespace(namespace)
+          .resource(podWithAttachedContainer)
+          .create())
       } catch {
         case NonFatal(e) =>
-          kubernetesClient.pods()
-            .inNamespace(namespace)
-            .resource(createdExecutorPod)
-            .delete()
-          throw e
+          // Register failure with global tracker if lifecycle manager is 
available
+          val failureCount = totalFailedPodCreations.incrementAndGet()
+          if (executorPodsLifecycleManager != null) {
+            executorPodsLifecycleManager.registerPodCreationFailure()
+          }
+          logError(log"Failed to create executor pod 
${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
+            log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
+          None
+      }
+      optCreatedExecutorPod.foreach { createdExecutorPod =>
+        try {

Review Comment:
   Errors from owner reference and PVC creation are handled differently it 
appears (since they throw an exception). I've added them to the tracking by the 
lifecycle manager but also retained the current behaviour of throwing an 
exception for these cases.
   Would you prefer we stop throwing an exception and have everything handled 
by the lifecycle manager?



##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala:
##########
@@ -1027,4 +1037,22 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
       KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
         k8sConf.resourceProfileId.toInt), Seq.empty)
   }
+
+test("Pod creation failures are tracked by ExecutorFailureTracker") {

Review Comment:
   Fixed



##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:
##########
@@ -158,8 +159,11 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       podsPollingEventSource)
   }
 
-  private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, 
kubernetesClient: KubernetesClient,
-      snapshotsStore: ExecutorPodsSnapshotsStore) = {
+  private[k8s] def makeExecutorPodsAllocator(
+      sc: SparkContext,
+      kubernetesClient: KubernetesClient,
+      snapshotsStore: ExecutorPodsSnapshotsStore,
+      lifecycleManager: ExecutorPodsLifecycleManager) = {

Review Comment:
   Good suggestion. Done.



##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala:
##########
@@ -1027,4 +1037,22 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
       KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
         k8sConf.resourceProfileId.toInt), Seq.empty)
   }
+
+test("Pod creation failures are tracked by ExecutorFailureTracker") {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to