Hi Karan, It seems that the way you tried to integrate with GCP is not the way that Kubeflow proposes. They set another way to use the secret for some reason... Here <https://www.kubeflow.org/docs/components/spark-operator/user-guide/gcp/> you can see how they tell you to do that.
HTH, Nimrod On Sun, Oct 6, 2024 at 9:37 PM karan alang <karan.al...@gmail.com> wrote: > Hi Nimrod, > > Thanks for the response .. > The volume gcs-key(representing secret - spark-gcs-creds) is shared > between the driver & executor both of which have the volumeMounts > (screenshot attached). > [image: Screenshot 2024-10-06 at 11.35.19 AM.png] > 'volumes' is defined at the same level as driver & executor > Does this need to be changed ? > > the volume spark-gcs-creds does have the spark-gcs-key.json as shown in > screenshot below: > > (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret > spark-gcs-creds -n so350 -o yaml > > apiVersion: v1 > > data: > > spark-gcs-key.json: > > <--- KEY ---> > > kind: Secret > > metadata: > > creationTimestamp: "2024-10-04T21:54:06Z" > > name: spark-gcs-creds > > namespace: so350 > > resourceVersion: "180991552" > > uid: ac30c575-9abf-4a77-ba90-15576607c97f > > type: Opaque > > any feedback on this > tia! > > > On Sun, Oct 6, 2024 at 1:16 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote: > >> Hi Karan, >> >> In the executor I see you have volume (and not just volumeMounts)- I >> don't see you have volume in the driver - so you probably don't have such a >> volume. >> Also - please check if the volume has in its data section >> for spark-gcs-key.json - you can check it with: >> describe secret <secret-name> >> >> HTH, >> Nimrod >> >> >> On Sat, Oct 5, 2024 at 11:20 PM karan alang <karan.al...@gmail.com> >> wrote: >> >>> Thanks Nimrod ! >>> yes the checkpoint is in GCS and as you pointed out the service account >>> being used did not have access to the GCS storage bucket, i was able to add >>> the permissions and GCS bucket is accessible now .. >>> >>> however, here is another issue -I've mounted a secret 'spark-gcs-creds' >>> which contains the key to the service account 'spark' being used by the >>> application .. however the secret is not getting mounted. >>> >>> here is the error : >>> ``` >>> >>> 24/10/05 20:10:36 INFO BasicExecutorFeatureStep: Decommissioning not >>> enabled, skipping shutdown script >>> >>> 24/10/05 20:10:38 WARN FileSystem: Failed to initialize fileystem >>> gs://vkp-spark-history-server/spark-events: java.io.FileNotFoundException: >>> /mnt/secrets/spark-gcs-key.json (No such file or directory) >>> >>> 24/10/05 20:10:38 ERROR SparkContext: Error initializing SparkContext. >>> >>> java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such >>> file or directory) >>> >>> at java.base/java.io.FileInputStream.open0(Native Method) >>> >>> at java.base/java.io.FileInputStream.open(Unknown Source) >>> >>> at java.base/java.io.FileInputStream.<init>(Unknown Source) >>> >>> at java.base/java.io.FileInputStream.<init>(Unknown Source) >>> >>> at >>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromJsonKeyFile(CredentialFactory.java:297) >>> >>> at >>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:414) >>> >>> at >>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1479) >>> >>> at >>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1638) >>> >>> at >>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1620) >>> >>> at >>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:507) >>> >>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469) >>> >>> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) >>> >>> at >>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) >>> >>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) >>> >>> ``` >>> >>> here is the job yaml -> >>> >>> apiVersion: "sparkoperator.k8s.io/v1beta2" >>> kind: SparkApplication >>> metadata: >>> name: structured-streaming-350-{{ now | unixEpoch }} >>> namespace: so350 >>> spec: >>> type: Python >>> mode: cluster >>> sparkVersion: "3.5.0" >>> image: " >>> us-east1-docker.pkg.dev/versa-kafka-poc/spark-job-repo/ss-main-so350:3.5.0 >>> " >>> imagePullPolicy: Always >>> imagePullSecrets: >>> - gcr-json-key >>> mainApplicationFile: "local:///opt/spark/custom-dir/main.py" >>> restartPolicy: >>> type: OnFailure >>> onFailureRetries: 3 >>> onFailureRetryInterval: 10 >>> onSubmissionFailureRetries: 5 >>> onSubmissionFailureRetryInterval: 20 >>> driver: >>> cores: {{ .Values.driver.cores }} >>> coreLimit: "{{ .Values.driver.coreLimit }}" >>> memory: "{{ .Values.driver.memory }}" >>> labels: >>> version: 3.5.0 >>> serviceAccount: spark >>> securityContext: >>> runAsUser: 185 # UID for spark user >>> volumeMounts: >>> - name: gcs-key >>> mountPath: /mnt/secrets >>> readOnly: true >>> initContainers: >>> - name: init1 >>> image: busybox:1.28 >>> command: ['sh', '-c', "until nslookup myservice.$(cat /var/run/secrets/ >>> kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo >>> waiting for myservice; sleep 2; done"] >>> - name: init2 >>> image: busybox >>> command: >>> - /bin/sh >>> - "-c" >>> - | >>> echo "Checking mounted secrets"; >>> ls -l /mnt/secrets; >>> cat /mnt/secrets/spark-gcs-key.json; >>> sleep 180; # Keep the init container running for inspection >>> executor: >>> cores: {{ .Values.executor.cores }} >>> instances: {{ .Values.executor.instances }} >>> memory: "{{ .Values.executor.memory }}" >>> labels: >>> version: "3.5.0" >>> securityContext: >>> runAsUser: 185 # UID for spark user >>> volumeMounts: >>> - name: gcs-key >>> mountPath: /mnt/secrets >>> readOnly: true >>> volumes: >>> - name: gcs-key >>> secret: >>> secretName: spark-gcs-creds >>> deps: >>> jars: >>> - local:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar >>> - local:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar >>> - local:///opt/spark/other-jars/bson-4.0.5.jar >>> - local:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar >>> - local:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar >>> - local:///opt/spark/other-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar >>> - >>> local:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar >>> - >>> local:///opt/spark/other-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar >>> - local:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar >>> - >>> local:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar >>> - local:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar >>> - local:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar >>> - >>> local:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar >>> - local:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar >>> - >>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar >>> - local:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar >>> - >>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar >>> pyFiles: >>> - local:///opt/spark/zips/streams.zip >>> - local:///opt/spark/zips/utils.zip >>> files: >>> - local:///opt/spark/certs/syslog-vani-prefix.p12 >>> - local:///opt/spark/certs/versa-alarmblock-test-user.p12 >>> - local:///opt/spark/certs/versa-appstat-test-user.p12 >>> - local:///opt/spark/certs/versa-bandwidth-test-user.p12 >>> - local:///opt/spark/certs/intfutil-user-test.p12 >>> - local:///opt/spark/certs/alarm-compression-user-test.p12 >>> - local:///opt/spark/certs/alarmblock-user-test.p12 >>> - local:///opt/spark/certs/appstat-agg-user-test.p12 >>> - local:///opt/spark/certs/appstat-anomaly-user-test.p12 >>> - local:///opt/spark/certs/appstats-user-test.p12 >>> - local:///opt/spark/certs/insights-user-test.p12 >>> - local:///opt/spark/cfg/params.cfg >>> - local:///opt/spark/cfg/params_password.cfg >>> pythonVersion: "{{ .Values.pythonVersion }}" >>> sparkConf: >>> "spark.hadoop.google.cloud.project.id": "versa-kafka-poc" >>> "spark.hadoop.fs.gs.project.id": "versa-kafka-poc" >>> "spark.kafka.ssl.keystore.location": >>> "/opt/spark/certs/syslog-vani-prefix.p12" >>> "spark.kafka.ssl.truststore.location": >>> "/opt/spark/certs/versa-kafka-poc-tf-ca.p12" >>> "spark.kubernetes.namespace": "so350" >>> "spark.kubernetes.authenticate.driver.serviceAccountName": "spark" >>> "spark.kubernetes.container.image": "{{ .Values.image }}" >>> "spark.kubernetes.driver.container.image": "{{ .Values.image }}" >>> "spark.kubernetes.executor.container.image": "{{ .Values.image }}" >>> "spark.hadoop.fs.gs.impl": >>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" >>> "spark.hadoop.fs.AbstractFileSystem.gs.impl": >>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" >>> "spark.hadoop.google.cloud.auth.service.account.enable": "true" >>> "spark.hadoop.google.cloud.auth.service.account.json.keyfile": >>> "/mnt/secrets/spark-gcs-key.json" >>> "spark.eventLog.enabled": "true" >>> "spark.eventLog.dir": "{{ .Values.sparkEventLogDir }}" >>> "spark.hadoop.fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE" >>> "spark.dynamicAllocation.enabled": "false" >>> "spark.dynamicAllocation.executorIdleTimeout": "120s" >>> "spark.shuffle.service.enabled": "false" >>> "spark.kubernetes.executor.deleteOnTermination": "false" >>> "spark.jars": >>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" >>> "spark.driver.extraClassPath": >>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" >>> "spark.executor.extraClassPath": >>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" >>> "spark.submit.pyFiles": >>> "file:///opt/spark/zips/streams.zip,file:///opt/spark/zips/utils.zip" >>> hadoopConf: >>> "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" >>> "fs.AbstractFileSystem.gs.impl": >>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" >>> "google.cloud.auth.service.account.enable": "true" >>> "google.cloud.auth.service.account.json.keyfile": >>> "/mnt/secrets/spark-gcs-key.json" >>> arguments: >>> - "--isdebug={{ .Values.isdebug }}" >>> - "--istest={{ .Values.isdebug }}" >>> >>> here is snapshot of the secret : >>> >>> ``` >>> >>> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret >>> spark-gcs-creds -n so350 -o yaml >>> >>> apiVersion: v1 >>> >>> data: >>> >>> spark-gcs-key.json: >>> >>> <--- KEY ---> >>> >>> kind: Secret >>> >>> metadata: >>> >>> creationTimestamp: "2024-10-04T21:54:06Z" >>> >>> name: spark-gcs-creds >>> >>> namespace: so350 >>> >>> resourceVersion: "180991552" >>> >>> uid: ac30c575-9abf-4a77-ba90-15576607c97f >>> >>> type: Opaque >>> >>> ``` >>> >>> Any ideas on how to debug/fix this ? >>> >>> tia! >>> >>> >>> >>> On Thu, Oct 3, 2024 at 12:37 AM Nimrod Ofek <ofek.nim...@gmail.com> >>> wrote: >>> >>>> Where is the checkpoint location? Not in GCS? >>>> Probably the location of the checkpoint is there- and you don't have >>>> permissions for that... >>>> >>>> בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang < >>>> karan.al...@gmail.com>: >>>> >>>>> This seems to be the cause of this -> >>>>> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not >>>>> getting mounted die to this error -> MountVolume.SetUp failed for volume >>>>> “spark-conf-volume-driver >>>>> >>>>> I'm getting same error in event logs, and the secret mounted is not >>>>> getting read >>>>> >>>>> If anyone is using spark-operator, and run into this issue .. pls let >>>>> me know. >>>>> Alternatively, is there another operator that can be used ? >>>>> >>>>> thanks! >>>>> >>>>> >>>>> On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com> >>>>> wrote: >>>>> >>>>>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running >>>>>> a structured streaming job which reads data from kafka .. the job is run >>>>>> every 10 mins. >>>>>> >>>>>> It is giving an error shown below: >>>>>> >>>>>> ``` >>>>>> >>>>>> Traceback (most recent call last): >>>>>> File "/opt/spark/custom-dir/main.py", line 356, in <module> >>>>>> sys.exit(main()) >>>>>> File "/opt/spark/custom-dir/main.py", line 352, in main >>>>>> ss.readData() >>>>>> File "/opt/spark/custom-dir/main.py", line 327, in readData >>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >>>>>> "topic").writeStream \ >>>>>> File >>>>>> "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", line >>>>>> 1491, in start >>>>>> return self._sq(self._jwrite.start()) >>>>>> File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", >>>>>> line 1304, in __call__ >>>>>> return_value = get_return_value( >>>>>> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", >>>>>> line 111, in deco >>>>>> return f(*a, **kw) >>>>>> File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line >>>>>> 326, in get_return_value >>>>>> raise Py4JJavaError( >>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start. >>>>>> : >>>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: >>>>>> 403 Forbidden >>>>>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc >>>>>> { >>>>>> "code" : 403, >>>>>> "errors" : [ { >>>>>> "domain" : "global", >>>>>> "message" : >>>>>> "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not have >>>>>> storage.buckets.create access to the Google Cloud project. Permission >>>>>> 'storage.buckets.create' denied on resource (or it may not exist).", >>>>>> "reason" : "forbidden" >>>>>> } ], >>>>>> "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com >>>>>> does not have storage.buckets.create access to the Google Cloud project. >>>>>> Permission 'storage.buckets.create' denied on resource (or it may not >>>>>> exist)." >>>>>> } >>>>>> >>>>>> >>>>>> ``` >>>>>> >>>>>> Code reading data from kafka on GKE : >>>>>> >>>>>> >>>>>> ``` >>>>>> >>>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>>> .option("kafka.security.protocol", "SSL") \ >>>>>> .option("kafka.ssl.truststore.location", >>>>>> self.ssl_truststore_location) \ >>>>>> .option("kafka.ssl.truststore.password", >>>>>> self.ssl_truststore_password) \ >>>>>> .option("kafka.ssl.keystore.location", >>>>>> self.ssl_keystore_location) \ >>>>>> .option("kafka.ssl.keystore.password", >>>>>> self.ssl_keystore_password) \ >>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>>> .option("subscribePattern", "topic") \ >>>>>> .option("startingOffsets", "latest") \ >>>>>> .option("failOnDataLoss", "false") \ >>>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>>> .load() >>>>>> >>>>>> logger.info(f" df_stream, calling convertToDictForEachBatch -> >>>>>> {df_stream}") >>>>>> # trigger once >>>>>> >>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>>> "timestamp", "topic").writeStream \ >>>>>> .outputMode("append") \ >>>>>> .trigger(processingTime='10 minutes') \ >>>>>> .option("truncate", "false") \ >>>>>> .option("checkpointLocation", self.checkpoint) \ >>>>>> .foreachBatch(self.convertToDictForEachBatch) \ >>>>>> .start() >>>>>> >>>>>> ``` >>>>>> >>>>>> I'm unable to understand why error states - bucket.storage.create >>>>>> privilege not there when the code is actually reading data from kafka >>>>>> >>>>>> Any ideas on how to debug/fix ? >>>>>> >>>>>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs >>>>>> on kubernetes ? >>>>>> >>>>>> tia! >>>>>> >>>>>> here is stack overflow ticket -> >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>