Thanks Nimrod ! yes the checkpoint is in GCS and as you pointed out the service account being used did not have access to the GCS storage bucket, i was able to add the permissions and GCS bucket is accessible now ..
however, here is another issue -I've mounted a secret 'spark-gcs-creds' which contains the key to the service account 'spark' being used by the application .. however the secret is not getting mounted. here is the error : ``` 24/10/05 20:10:36 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script 24/10/05 20:10:38 WARN FileSystem: Failed to initialize fileystem gs://vkp-spark-history-server/spark-events: java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such file or directory) 24/10/05 20:10:38 ERROR SparkContext: Error initializing SparkContext. java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) at java.base/java.io.FileInputStream.open(Unknown Source) at java.base/java.io.FileInputStream.<init>(Unknown Source) at java.base/java.io.FileInputStream.<init>(Unknown Source) at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromJsonKeyFile(CredentialFactory.java:297) at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:414) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1479) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1638) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1620) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:507) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469) at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) ``` here is the job yaml -> apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: structured-streaming-350-{{ now | unixEpoch }} namespace: so350 spec: type: Python mode: cluster sparkVersion: "3.5.0" image: " us-east1-docker.pkg.dev/versa-kafka-poc/spark-job-repo/ss-main-so350:3.5.0" imagePullPolicy: Always imagePullSecrets: - gcr-json-key mainApplicationFile: "local:///opt/spark/custom-dir/main.py" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: {{ .Values.driver.cores }} coreLimit: "{{ .Values.driver.coreLimit }}" memory: "{{ .Values.driver.memory }}" labels: version: 3.5.0 serviceAccount: spark securityContext: runAsUser: 185 # UID for spark user volumeMounts: - name: gcs-key mountPath: /mnt/secrets readOnly: true initContainers: - name: init1 image: busybox:1.28 command: ['sh', '-c', "until nslookup myservice.$(cat /var/run/secrets/ kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done"] - name: init2 image: busybox command: - /bin/sh - "-c" - | echo "Checking mounted secrets"; ls -l /mnt/secrets; cat /mnt/secrets/spark-gcs-key.json; sleep 180; # Keep the init container running for inspection executor: cores: {{ .Values.executor.cores }} instances: {{ .Values.executor.instances }} memory: "{{ .Values.executor.memory }}" labels: version: "3.5.0" securityContext: runAsUser: 185 # UID for spark user volumeMounts: - name: gcs-key mountPath: /mnt/secrets readOnly: true volumes: - name: gcs-key secret: secretName: spark-gcs-creds deps: jars: - local:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar - local:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar - local:///opt/spark/other-jars/bson-4.0.5.jar - local:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar - local:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar - local:///opt/spark/other-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar - local:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar - local:///opt/spark/other-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar - local:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar - local:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar - local:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar - local:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar - local:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar - local:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar - local:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar - local:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar - local:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar pyFiles: - local:///opt/spark/zips/streams.zip - local:///opt/spark/zips/utils.zip files: - local:///opt/spark/certs/syslog-vani-prefix.p12 - local:///opt/spark/certs/versa-alarmblock-test-user.p12 - local:///opt/spark/certs/versa-appstat-test-user.p12 - local:///opt/spark/certs/versa-bandwidth-test-user.p12 - local:///opt/spark/certs/intfutil-user-test.p12 - local:///opt/spark/certs/alarm-compression-user-test.p12 - local:///opt/spark/certs/alarmblock-user-test.p12 - local:///opt/spark/certs/appstat-agg-user-test.p12 - local:///opt/spark/certs/appstat-anomaly-user-test.p12 - local:///opt/spark/certs/appstats-user-test.p12 - local:///opt/spark/certs/insights-user-test.p12 - local:///opt/spark/cfg/params.cfg - local:///opt/spark/cfg/params_password.cfg pythonVersion: "{{ .Values.pythonVersion }}" sparkConf: "spark.hadoop.google.cloud.project.id": "versa-kafka-poc" "spark.hadoop.fs.gs.project.id": "versa-kafka-poc" "spark.kafka.ssl.keystore.location": "/opt/spark/certs/syslog-vani-prefix.p12" "spark.kafka.ssl.truststore.location": "/opt/spark/certs/versa-kafka-poc-tf-ca.p12" "spark.kubernetes.namespace": "so350" "spark.kubernetes.authenticate.driver.serviceAccountName": "spark" "spark.kubernetes.container.image": "{{ .Values.image }}" "spark.kubernetes.driver.container.image": "{{ .Values.image }}" "spark.kubernetes.executor.container.image": "{{ .Values.image }}" "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" "spark.hadoop.google.cloud.auth.service.account.enable": "true" "spark.hadoop.google.cloud.auth.service.account.json.keyfile": "/mnt/secrets/spark-gcs-key.json" "spark.eventLog.enabled": "true" "spark.eventLog.dir": "{{ .Values.sparkEventLogDir }}" "spark.hadoop.fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE" "spark.dynamicAllocation.enabled": "false" "spark.dynamicAllocation.executorIdleTimeout": "120s" "spark.shuffle.service.enabled": "false" "spark.kubernetes.executor.deleteOnTermination": "false" "spark.jars": "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" "spark.driver.extraClassPath": "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" "spark.executor.extraClassPath": "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" "spark.submit.pyFiles": "file:///opt/spark/zips/streams.zip,file:///opt/spark/zips/utils.zip" hadoopConf: "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" "fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" "google.cloud.auth.service.account.enable": "true" "google.cloud.auth.service.account.json.keyfile": "/mnt/secrets/spark-gcs-key.json" arguments: - "--isdebug={{ .Values.isdebug }}" - "--istest={{ .Values.isdebug }}" here is snapshot of the secret : ``` (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret spark-gcs-creds -n so350 -o yaml apiVersion: v1 data: spark-gcs-key.json: <--- KEY ---> kind: Secret metadata: creationTimestamp: "2024-10-04T21:54:06Z" name: spark-gcs-creds namespace: so350 resourceVersion: "180991552" uid: ac30c575-9abf-4a77-ba90-15576607c97f type: Opaque ``` Any ideas on how to debug/fix this ? tia! On Thu, Oct 3, 2024 at 12:37 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote: > Where is the checkpoint location? Not in GCS? > Probably the location of the checkpoint is there- and you don't have > permissions for that... > > בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang < > karan.al...@gmail.com>: > >> This seems to be the cause of this -> >> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not >> getting mounted die to this error -> MountVolume.SetUp failed for volume >> “spark-conf-volume-driver >> >> I'm getting same error in event logs, and the secret mounted is not >> getting read >> >> If anyone is using spark-operator, and run into this issue .. pls let me >> know. >> Alternatively, is there another operator that can be used ? >> >> thanks! >> >> >> On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com> wrote: >> >>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running a >>> structured streaming job which reads data from kafka .. the job is run >>> every 10 mins. >>> >>> It is giving an error shown below: >>> >>> ``` >>> >>> Traceback (most recent call last): >>> File "/opt/spark/custom-dir/main.py", line 356, in <module> >>> sys.exit(main()) >>> File "/opt/spark/custom-dir/main.py", line 352, in main >>> ss.readData() >>> File "/opt/spark/custom-dir/main.py", line 327, in readData >>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >>> "topic").writeStream \ >>> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", >>> line 1491, in start >>> return self._sq(self._jwrite.start()) >>> File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line >>> 1304, in __call__ >>> return_value = get_return_value( >>> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line >>> 111, in deco >>> return f(*a, **kw) >>> File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326, >>> in get_return_value >>> raise Py4JJavaError( >>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start. >>> : >>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: >>> 403 Forbidden >>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc >>> { >>> "code" : 403, >>> "errors" : [ { >>> "domain" : "global", >>> "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com >>> does not have storage.buckets.create access to the Google Cloud project. >>> Permission 'storage.buckets.create' denied on resource (or it may not >>> exist).", >>> "reason" : "forbidden" >>> } ], >>> "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com >>> does not have storage.buckets.create access to the Google Cloud project. >>> Permission 'storage.buckets.create' denied on resource (or it may not >>> exist)." >>> } >>> >>> >>> ``` >>> >>> Code reading data from kafka on GKE : >>> >>> >>> ``` >>> >>> df_stream = self.spark.readStream.format('kafka') \ >>> .option("kafka.security.protocol", "SSL") \ >>> .option("kafka.ssl.truststore.location", >>> self.ssl_truststore_location) \ >>> .option("kafka.ssl.truststore.password", >>> self.ssl_truststore_password) \ >>> .option("kafka.ssl.keystore.location", >>> self.ssl_keystore_location) \ >>> .option("kafka.ssl.keystore.password", >>> self.ssl_keystore_password) \ >>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>> .option("subscribePattern", "topic") \ >>> .option("startingOffsets", "latest") \ >>> .option("failOnDataLoss", "false") \ >>> .option("kafka.metadata.max.age.ms", "1000") \ >>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>> .load() >>> >>> logger.info(f" df_stream, calling convertToDictForEachBatch -> >>> {df_stream}") >>> # trigger once >>> >>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >>> "topic").writeStream \ >>> .outputMode("append") \ >>> .trigger(processingTime='10 minutes') \ >>> .option("truncate", "false") \ >>> .option("checkpointLocation", self.checkpoint) \ >>> .foreachBatch(self.convertToDictForEachBatch) \ >>> .start() >>> >>> ``` >>> >>> I'm unable to understand why error states - bucket.storage.create >>> privilege not there when the code is actually reading data from kafka >>> >>> Any ideas on how to debug/fix ? >>> >>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on >>> kubernetes ? >>> >>> tia! >>> >>> here is stack overflow ticket -> >>> >>> >>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea >>> >>> >>> >>> >>>