This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new a1b66d53 [YUNIKORN-2875] E2E: Launch spark from official image to 
avoid client java dependency (#956)
a1b66d53 is described below

commit a1b66d53be946a2fb63a32ea694742073366f48a
Author: kaichiachen <kaichia...@gmail.com>
AuthorDate: Tue Mar 11 18:01:33 2025 -0500

    [YUNIKORN-2875] E2E: Launch spark from official image to avoid client java 
dependency (#956)
    
    Closes: #956
    
    Signed-off-by: Craig Condit <ccon...@apache.org>
---
 Makefile                                           | 19 ++-----
 .../spark_jobs_scheduling_test.go                  | 37 +++++--------
 test/e2e/testdata/spark_jobs.sh                    | 62 +++++++++++-----------
 3 files changed, 47 insertions(+), 71 deletions(-)

diff --git a/Makefile b/Makefile
index f73963cb..6fde1a02 100644
--- a/Makefile
+++ b/Makefile
@@ -191,13 +191,11 @@ HELM_ARCHIVE_BASE=$(OS)-$(EXEC_ARCH)
 export PATH := $(BASE_DIR)/$(HELM_PATH):$(PATH)
 
 # spark
-export SPARK_VERSION=3.3.3
+export SPARK_VERSION=3.5.5-java17
 # sometimes the image is not avaiable with $SPARK_VERSION, the minor version 
must match
-export SPARK_PYTHON_VERSION=3.3.1
-export SPARK_HOME=$(BASE_DIR)$(TOOLS_DIR)/spark-v$(SPARK_VERSION)
-export SPARK_SUBMIT_CMD=$(SPARK_HOME)/bin/spark-submit
+export SPARK_PYTHON_VERSION=3.4.0
+export SPARK_IMAGE=apache/spark:$(SPARK_VERSION)
 export SPARK_PYTHON_IMAGE=docker.io/apache/spark-py:v$(SPARK_PYTHON_VERSION)
-export PATH := $(SPARK_HOME):$(PATH)
 
 # go-licenses
 GO_LICENSES_VERSION=v1.6.0
@@ -270,7 +268,7 @@ print_helm_version:
 
 # Install tools
 .PHONY: tools
-tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN) 
$(HELM_BIN) $(SPARK_SUBMIT_CMD) $(GO_LICENSES_BIN) $(GINKGO_BIN)
+tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN) 
$(HELM_BIN) $(GO_LICENSES_BIN) $(GINKGO_BIN)
 
 # Install shellcheck
 $(SHELLCHECK_BIN):
@@ -309,15 +307,6 @@ $(HELM_BIN):
        @curl -sSfL "https://get.helm.sh/$(HELM_ARCHIVE)" \
                | tar -x -z --strip-components=1 -C "$(HELM_PATH)" 
"$(HELM_ARCHIVE_BASE)/helm"
 
-# Install spark
-$(SPARK_SUBMIT_CMD):
-       @echo "installing spark v$(SPARK_VERSION)"
-       @rm -rf "$(SPARK_HOME)" "$(SPARK_HOME).tmp"
-       @mkdir -p "$(SPARK_HOME).tmp"
-       @curl -sSfL 
"https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz";
 \
-               | tar -x -z --strip-components=1 -C "$(SPARK_HOME).tmp" 
-       @mv -f "$(SPARK_HOME).tmp" "$(SPARK_HOME)"
-
 # Install go-licenses
 $(GO_LICENSES_BIN):
        @echo "installing go-licenses $(GO_LICENSES_VERSION)"
diff --git a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go 
b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
index 4bd2ad21..3c4a661b 100644
--- a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
+++ b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
@@ -21,7 +21,6 @@ package spark_jobs_scheduling
 import (
        "context"
        "fmt"
-       "net/url"
        "os"
        "os/exec"
        "sort"
@@ -29,7 +28,6 @@ import (
 
        v1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/util/wait"
-       "k8s.io/client-go/rest"
 
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
        tests "github.com/apache/yunikorn-k8shim/test/e2e"
@@ -46,16 +44,17 @@ var _ = Describe("", func() {
        var exErr error
        var sparkNS = "spark-" + common.RandSeq(10)
        var svcAcc = "svc-acc-" + common.RandSeq(10)
-       var config *rest.Config
-       var masterURL string
        var roleName = "spark-jobs-role-" + common.RandSeq(5)
        var clusterEditRole = "edit"
-       var sparkImage = os.Getenv("SPARK_PYTHON_IMAGE")
+       var sparkImage = os.Getenv("SPARK_IMAGE")
+       var sparkPyImage = os.Getenv("SPARK_PYTHON_IMAGE")
        var sparkExecutorCount = 3
 
        BeforeEach(func() {
                By(fmt.Sprintf("Spark image is: %s", sparkImage))
                Ω(sparkImage).NotTo(BeEmpty())
+               By(fmt.Sprintf("Spark_py image is: %s", sparkPyImage))
+               Ω(sparkPyImage).NotTo(BeEmpty())
                kClient = k8s.KubeCtl{}
                Ω(kClient.SetClient()).To(BeNil())
                Ω(exErr).NotTo(HaveOccurred())
@@ -71,21 +70,6 @@ var _ = Describe("", func() {
                By(fmt.Sprintf("Creating cluster role binding: %s for spark 
jobs", roleName))
                _, err = kClient.CreateClusterRoleBinding(roleName, 
clusterEditRole, sparkNS, svcAcc)
                Ω(err).NotTo(HaveOccurred())
-
-               config, err = kClient.GetKubeConfig()
-               Ω(err).NotTo(HaveOccurred())
-
-               u, err := url.Parse(config.Host)
-               Ω(err).NotTo(HaveOccurred())
-               port := u.Port()
-               if port == "" {
-                       port = "443"
-                       if u.Scheme == "http" {
-                               port = "80"
-                       }
-               }
-               masterURL = u.Scheme + "://" + u.Hostname() + ":" + port
-               By(fmt.Sprintf("MasterURL info is %s ", masterURL))
        })
 
        It("Test_With_Spark_Jobs", func() {
@@ -93,8 +77,8 @@ var _ = Describe("", func() {
                err := exec.Command(
                        "bash",
                        "../testdata/spark_jobs.sh",
-                       masterURL,
                        sparkImage,
+                       sparkPyImage,
                        sparkNS,
                        svcAcc,
                        string(rune(sparkExecutorCount))).Run()
@@ -110,12 +94,15 @@ var _ = Describe("", func() {
                err = 
restClient.WaitforQueueToAppear(configmanager.DefaultPartition, sparkQueueName, 
120)
                Ω(err).NotTo(HaveOccurred())
 
-               By(fmt.Sprintf("Get apps from specific queue: %s", sparkNS))
+               By(fmt.Sprintf("Get apps from specific queue: %s", 
sparkQueueName))
                var appsFromQueue []*dao.ApplicationDAOInfo
                // Poll for apps to appear in the queue
                err = wait.PollUntilContextTimeout(context.TODO(), 
time.Millisecond*100, time.Duration(120)*time.Second, false, 
func(context.Context) (done bool, err error) {
-                       appsFromQueue, err = 
restClient.GetApps(configmanager.DefaultPartition, 
configmanager.RootQueue+"."+sparkNS)
-                       if len(appsFromQueue) == 3 {
+                       appsFromQueue, err = 
restClient.GetApps(configmanager.DefaultPartition, sparkQueueName)
+                       if err != nil {
+                               return false, err
+                       }
+                       if len(appsFromQueue) == 4 {
                                return true, nil
                        }
                        return false, err
@@ -138,7 +125,7 @@ var _ = Describe("", func() {
                By(fmt.Sprintf("Apps submitted are: %s", appIds))
 
                // Verify that all the spark jobs are scheduled and are in 
running state.
-               for _, id := range appIds {
+               for _, id := range appIds[1:] {
                        By(fmt.Sprintf("Verify driver pod for application %s 
has been created.", id))
                        err = kClient.WaitForPodBySelector(sparkNS, 
fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 180*time.Second)
                        Ω(err).ShouldNot(HaveOccurred())
diff --git a/test/e2e/testdata/spark_jobs.sh b/test/e2e/testdata/spark_jobs.sh
index 04384523..ce09793f 100755
--- a/test/e2e/testdata/spark_jobs.sh
+++ b/test/e2e/testdata/spark_jobs.sh
@@ -22,42 +22,42 @@ if [[ $# -gt 8 ]]; then
   exit 1
 fi
 
-MASTER_URL=$1
-SPARK_IMAGE=$2
+SPARK_IMAGE=$1
+SPARK_PY_IMAGE=$2
 NAMESPACE=$3
 SVC_ACC=$4
 EXEC_COUNT=3
 END=${6:-3}
 
-if [[ -z "${SPARK_HOME}" ]]; then
-  SPARK_SUBMIT_CMD="spark-submit"
-else
-  SPARK_SUBMIT_CMD="${SPARK_HOME}/bin/spark-submit"
-fi
-
+kubectl run spark-client --image="$SPARK_IMAGE" -n "$NAMESPACE" 
--overrides="{\"spec\": {\"serviceAccountName\": \"$SVC_ACC\"}}" -- sleep 
infinity
+kubectl wait --for=condition=ready pod/spark-client -n "$NAMESPACE" 
--timeout=300s
+kubectl cp ../testdata/spark_pod_template.yaml 
spark-client:/tmp/spark_pod_template.yaml -n "$NAMESPACE"
+MASTER_URL=$(kubectl exec spark-client -n "$NAMESPACE" -- bash -c "echo 
\"https://\${KUBERNETES_SERVICE_HOST}:\${KUBERNETES_SERVICE_PORT}\"";)
 for i in $(seq 1 "$END"); do
-  nohup "$SPARK_SUBMIT_CMD" \
-      --master k8s://"$MASTER_URL" \
-      --deploy-mode cluster \
-      --name spark-yk-example-"$i" \
-      --conf spark.executor.instances=$EXEC_COUNT \
-      --conf spark.kubernetes.container.image="$SPARK_IMAGE" \
-      --conf 
spark.kubernetes.authenticate.driver.serviceAccountName="$SVC_ACC" \
-      --conf spark.pyspark.python=python3 \
-      --conf spark.pyspark.driver.python=python3 \
-      --conf spark.kubernetes.file.upload.path=/opt/spark/upload-temp \
-      --conf 
spark.kubernetes.driver.podTemplateFile=../testdata/spark_pod_template.yaml \
-      --conf 
spark.kubernetes.executor.podTemplateFile=../testdata/spark_pod_template.yaml \
-      --conf spark.kubernetes.namespace="$NAMESPACE" \
-      --conf spark.kubernetes.driver.limit.cores=0.5 \
-      --conf spark.kubernetes.driver.request.cores=0.1 \
-      --conf spark.driver.memory=500m \
-      --conf spark.driver.memoryOverhead=500m \
-      --conf spark.kubernetes.executor.limit.cores=0.5 \
-      --conf spark.kubernetes.executor.request.cores=0.1 \
-      --conf spark.executor.memory=500m \
-      --conf spark.executor.memoryOverhead=500m \
-      local:///opt/spark/examples/src/main/python/pi.py \
-      100 &
+  CMD="kubectl exec spark-client -n $NAMESPACE -- bash -c \
+        \"/opt/spark/bin/spark-submit \
+        --master k8s://$MASTER_URL  \
+        --deploy-mode cluster \
+        --name spark-yk-example-$i \
+        --conf spark.executor.instances=$EXEC_COUNT \
+        --conf spark.kubernetes.container.image=$SPARK_PY_IMAGE \
+        --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=$SVC_ACC \
+        --conf spark.pyspark.python=python3 \
+        --conf spark.pyspark.driver.python=python3 \
+        --conf spark.kubernetes.file.upload.path=/opt/spark/upload-temp \
+        --conf 
spark.kubernetes.driver.podTemplateFile=/tmp/spark_pod_template.yaml \
+        --conf 
spark.kubernetes.executor.podTemplateFile=/tmp/spark_pod_template.yaml \
+        --conf spark.kubernetes.namespace=$NAMESPACE \
+        --conf spark.kubernetes.driver.limit.cores=0.5 \
+        --conf spark.kubernetes.driver.request.cores=0.1 \
+        --conf spark.driver.memory=500m \
+        --conf spark.driver.memoryOverhead=500m \
+        --conf spark.kubernetes.executor.limit.cores=0.5 \
+        --conf spark.kubernetes.executor.request.cores=0.1 \
+        --conf spark.executor.memory=500m \
+        --conf spark.executor.memoryOverhead=500m \
+        local:///opt/spark/examples/src/main/python/pi.py \
+        100\""
+  eval "$CMD" &
   sleep 2
 done;


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to