This is an automated email from the ASF dual-hosted git repository. ccondit pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push: new a1b66d53 [YUNIKORN-2875] E2E: Launch spark from official image to avoid client java dependency (#956) a1b66d53 is described below commit a1b66d53be946a2fb63a32ea694742073366f48a Author: kaichiachen <kaichia...@gmail.com> AuthorDate: Tue Mar 11 18:01:33 2025 -0500 [YUNIKORN-2875] E2E: Launch spark from official image to avoid client java dependency (#956) Closes: #956 Signed-off-by: Craig Condit <ccon...@apache.org> --- Makefile | 19 ++----- .../spark_jobs_scheduling_test.go | 37 +++++-------- test/e2e/testdata/spark_jobs.sh | 62 +++++++++++----------- 3 files changed, 47 insertions(+), 71 deletions(-) diff --git a/Makefile b/Makefile index f73963cb..6fde1a02 100644 --- a/Makefile +++ b/Makefile @@ -191,13 +191,11 @@ HELM_ARCHIVE_BASE=$(OS)-$(EXEC_ARCH) export PATH := $(BASE_DIR)/$(HELM_PATH):$(PATH) # spark -export SPARK_VERSION=3.3.3 +export SPARK_VERSION=3.5.5-java17 # sometimes the image is not avaiable with $SPARK_VERSION, the minor version must match -export SPARK_PYTHON_VERSION=3.3.1 -export SPARK_HOME=$(BASE_DIR)$(TOOLS_DIR)/spark-v$(SPARK_VERSION) -export SPARK_SUBMIT_CMD=$(SPARK_HOME)/bin/spark-submit +export SPARK_PYTHON_VERSION=3.4.0 +export SPARK_IMAGE=apache/spark:$(SPARK_VERSION) export SPARK_PYTHON_IMAGE=docker.io/apache/spark-py:v$(SPARK_PYTHON_VERSION) -export PATH := $(SPARK_HOME):$(PATH) # go-licenses GO_LICENSES_VERSION=v1.6.0 @@ -270,7 +268,7 @@ print_helm_version: # Install tools .PHONY: tools -tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN) $(HELM_BIN) $(SPARK_SUBMIT_CMD) $(GO_LICENSES_BIN) $(GINKGO_BIN) +tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN) $(HELM_BIN) $(GO_LICENSES_BIN) $(GINKGO_BIN) # Install shellcheck $(SHELLCHECK_BIN): @@ -309,15 +307,6 @@ $(HELM_BIN): @curl -sSfL "https://get.helm.sh/$(HELM_ARCHIVE)" \ | tar -x -z --strip-components=1 -C "$(HELM_PATH)" "$(HELM_ARCHIVE_BASE)/helm" -# Install spark -$(SPARK_SUBMIT_CMD): - @echo "installing spark v$(SPARK_VERSION)" - @rm -rf "$(SPARK_HOME)" "$(SPARK_HOME).tmp" - @mkdir -p "$(SPARK_HOME).tmp" - @curl -sSfL "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz" \ - | tar -x -z --strip-components=1 -C "$(SPARK_HOME).tmp" - @mv -f "$(SPARK_HOME).tmp" "$(SPARK_HOME)" - # Install go-licenses $(GO_LICENSES_BIN): @echo "installing go-licenses $(GO_LICENSES_VERSION)" diff --git a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go index 4bd2ad21..3c4a661b 100644 --- a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go +++ b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go @@ -21,7 +21,6 @@ package spark_jobs_scheduling import ( "context" "fmt" - "net/url" "os" "os/exec" "sort" @@ -29,7 +28,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" "github.com/apache/yunikorn-core/pkg/webservice/dao" tests "github.com/apache/yunikorn-k8shim/test/e2e" @@ -46,16 +44,17 @@ var _ = Describe("", func() { var exErr error var sparkNS = "spark-" + common.RandSeq(10) var svcAcc = "svc-acc-" + common.RandSeq(10) - var config *rest.Config - var masterURL string var roleName = "spark-jobs-role-" + common.RandSeq(5) var clusterEditRole = "edit" - var sparkImage = os.Getenv("SPARK_PYTHON_IMAGE") + var sparkImage = os.Getenv("SPARK_IMAGE") + var sparkPyImage = os.Getenv("SPARK_PYTHON_IMAGE") var sparkExecutorCount = 3 BeforeEach(func() { By(fmt.Sprintf("Spark image is: %s", sparkImage)) Ω(sparkImage).NotTo(BeEmpty()) + By(fmt.Sprintf("Spark_py image is: %s", sparkPyImage)) + Ω(sparkPyImage).NotTo(BeEmpty()) kClient = k8s.KubeCtl{} Ω(kClient.SetClient()).To(BeNil()) Ω(exErr).NotTo(HaveOccurred()) @@ -71,21 +70,6 @@ var _ = Describe("", func() { By(fmt.Sprintf("Creating cluster role binding: %s for spark jobs", roleName)) _, err = kClient.CreateClusterRoleBinding(roleName, clusterEditRole, sparkNS, svcAcc) Ω(err).NotTo(HaveOccurred()) - - config, err = kClient.GetKubeConfig() - Ω(err).NotTo(HaveOccurred()) - - u, err := url.Parse(config.Host) - Ω(err).NotTo(HaveOccurred()) - port := u.Port() - if port == "" { - port = "443" - if u.Scheme == "http" { - port = "80" - } - } - masterURL = u.Scheme + "://" + u.Hostname() + ":" + port - By(fmt.Sprintf("MasterURL info is %s ", masterURL)) }) It("Test_With_Spark_Jobs", func() { @@ -93,8 +77,8 @@ var _ = Describe("", func() { err := exec.Command( "bash", "../testdata/spark_jobs.sh", - masterURL, sparkImage, + sparkPyImage, sparkNS, svcAcc, string(rune(sparkExecutorCount))).Run() @@ -110,12 +94,15 @@ var _ = Describe("", func() { err = restClient.WaitforQueueToAppear(configmanager.DefaultPartition, sparkQueueName, 120) Ω(err).NotTo(HaveOccurred()) - By(fmt.Sprintf("Get apps from specific queue: %s", sparkNS)) + By(fmt.Sprintf("Get apps from specific queue: %s", sparkQueueName)) var appsFromQueue []*dao.ApplicationDAOInfo // Poll for apps to appear in the queue err = wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, time.Duration(120)*time.Second, false, func(context.Context) (done bool, err error) { - appsFromQueue, err = restClient.GetApps(configmanager.DefaultPartition, configmanager.RootQueue+"."+sparkNS) - if len(appsFromQueue) == 3 { + appsFromQueue, err = restClient.GetApps(configmanager.DefaultPartition, sparkQueueName) + if err != nil { + return false, err + } + if len(appsFromQueue) == 4 { return true, nil } return false, err @@ -138,7 +125,7 @@ var _ = Describe("", func() { By(fmt.Sprintf("Apps submitted are: %s", appIds)) // Verify that all the spark jobs are scheduled and are in running state. - for _, id := range appIds { + for _, id := range appIds[1:] { By(fmt.Sprintf("Verify driver pod for application %s has been created.", id)) err = kClient.WaitForPodBySelector(sparkNS, fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 180*time.Second) Ω(err).ShouldNot(HaveOccurred()) diff --git a/test/e2e/testdata/spark_jobs.sh b/test/e2e/testdata/spark_jobs.sh index 04384523..ce09793f 100755 --- a/test/e2e/testdata/spark_jobs.sh +++ b/test/e2e/testdata/spark_jobs.sh @@ -22,42 +22,42 @@ if [[ $# -gt 8 ]]; then exit 1 fi -MASTER_URL=$1 -SPARK_IMAGE=$2 +SPARK_IMAGE=$1 +SPARK_PY_IMAGE=$2 NAMESPACE=$3 SVC_ACC=$4 EXEC_COUNT=3 END=${6:-3} -if [[ -z "${SPARK_HOME}" ]]; then - SPARK_SUBMIT_CMD="spark-submit" -else - SPARK_SUBMIT_CMD="${SPARK_HOME}/bin/spark-submit" -fi - +kubectl run spark-client --image="$SPARK_IMAGE" -n "$NAMESPACE" --overrides="{\"spec\": {\"serviceAccountName\": \"$SVC_ACC\"}}" -- sleep infinity +kubectl wait --for=condition=ready pod/spark-client -n "$NAMESPACE" --timeout=300s +kubectl cp ../testdata/spark_pod_template.yaml spark-client:/tmp/spark_pod_template.yaml -n "$NAMESPACE" +MASTER_URL=$(kubectl exec spark-client -n "$NAMESPACE" -- bash -c "echo \"https://\${KUBERNETES_SERVICE_HOST}:\${KUBERNETES_SERVICE_PORT}\"") for i in $(seq 1 "$END"); do - nohup "$SPARK_SUBMIT_CMD" \ - --master k8s://"$MASTER_URL" \ - --deploy-mode cluster \ - --name spark-yk-example-"$i" \ - --conf spark.executor.instances=$EXEC_COUNT \ - --conf spark.kubernetes.container.image="$SPARK_IMAGE" \ - --conf spark.kubernetes.authenticate.driver.serviceAccountName="$SVC_ACC" \ - --conf spark.pyspark.python=python3 \ - --conf spark.pyspark.driver.python=python3 \ - --conf spark.kubernetes.file.upload.path=/opt/spark/upload-temp \ - --conf spark.kubernetes.driver.podTemplateFile=../testdata/spark_pod_template.yaml \ - --conf spark.kubernetes.executor.podTemplateFile=../testdata/spark_pod_template.yaml \ - --conf spark.kubernetes.namespace="$NAMESPACE" \ - --conf spark.kubernetes.driver.limit.cores=0.5 \ - --conf spark.kubernetes.driver.request.cores=0.1 \ - --conf spark.driver.memory=500m \ - --conf spark.driver.memoryOverhead=500m \ - --conf spark.kubernetes.executor.limit.cores=0.5 \ - --conf spark.kubernetes.executor.request.cores=0.1 \ - --conf spark.executor.memory=500m \ - --conf spark.executor.memoryOverhead=500m \ - local:///opt/spark/examples/src/main/python/pi.py \ - 100 & + CMD="kubectl exec spark-client -n $NAMESPACE -- bash -c \ + \"/opt/spark/bin/spark-submit \ + --master k8s://$MASTER_URL \ + --deploy-mode cluster \ + --name spark-yk-example-$i \ + --conf spark.executor.instances=$EXEC_COUNT \ + --conf spark.kubernetes.container.image=$SPARK_PY_IMAGE \ + --conf spark.kubernetes.authenticate.driver.serviceAccountName=$SVC_ACC \ + --conf spark.pyspark.python=python3 \ + --conf spark.pyspark.driver.python=python3 \ + --conf spark.kubernetes.file.upload.path=/opt/spark/upload-temp \ + --conf spark.kubernetes.driver.podTemplateFile=/tmp/spark_pod_template.yaml \ + --conf spark.kubernetes.executor.podTemplateFile=/tmp/spark_pod_template.yaml \ + --conf spark.kubernetes.namespace=$NAMESPACE \ + --conf spark.kubernetes.driver.limit.cores=0.5 \ + --conf spark.kubernetes.driver.request.cores=0.1 \ + --conf spark.driver.memory=500m \ + --conf spark.driver.memoryOverhead=500m \ + --conf spark.kubernetes.executor.limit.cores=0.5 \ + --conf spark.kubernetes.executor.request.cores=0.1 \ + --conf spark.executor.memory=500m \ + --conf spark.executor.memoryOverhead=500m \ + local:///opt/spark/examples/src/main/python/pi.py \ + 100\"" + eval "$CMD" & sleep 2 done; --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org