Thanks Nimrod, I was able to get this working .. the key changes ->
1. since I was using webhook.namespaceSelector="spark-webhook-enabled=true" in the helm install, the namespace also need to have the label - spark-webhook-enabled=true, that ensures the volume/volumeMounts/configMaps are loaded 2. I changed the volume/volumeMount for the GCP secrets to secretType: GCPServiceAccount (though i think the volume/volumeMount might work as well, after the change 1) secrets: - name: spark-gcs-creds path: /mnt/secrets secretType: GCPServiceAccount On Mon, Oct 7, 2024 at 2:40 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote: > Hi Karan, > > It seems that the way you tried to integrate with GCP is not the way that > Kubeflow proposes. They set another way to use the secret for some reason... > Here > <https://www.kubeflow.org/docs/components/spark-operator/user-guide/gcp/> you > can see how they tell you to do that. > > HTH, > Nimrod > > > On Sun, Oct 6, 2024 at 9:37 PM karan alang <karan.al...@gmail.com> wrote: > >> Hi Nimrod, >> >> Thanks for the response .. >> The volume gcs-key(representing secret - spark-gcs-creds) is shared >> between the driver & executor both of which have the volumeMounts >> (screenshot attached). >> [image: Screenshot 2024-10-06 at 11.35.19 AM.png] >> 'volumes' is defined at the same level as driver & executor >> Does this need to be changed ? >> >> the volume spark-gcs-creds does have the spark-gcs-key.json as shown in >> screenshot below: >> >> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret >> spark-gcs-creds -n so350 -o yaml >> >> apiVersion: v1 >> >> data: >> >> spark-gcs-key.json: >> >> <--- KEY ---> >> >> kind: Secret >> >> metadata: >> >> creationTimestamp: "2024-10-04T21:54:06Z" >> >> name: spark-gcs-creds >> >> namespace: so350 >> >> resourceVersion: "180991552" >> >> uid: ac30c575-9abf-4a77-ba90-15576607c97f >> >> type: Opaque >> >> any feedback on this >> tia! >> >> >> On Sun, Oct 6, 2024 at 1:16 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote: >> >>> Hi Karan, >>> >>> In the executor I see you have volume (and not just volumeMounts)- I >>> don't see you have volume in the driver - so you probably don't have such a >>> volume. >>> Also - please check if the volume has in its data section >>> for spark-gcs-key.json - you can check it with: >>> describe secret <secret-name> >>> >>> HTH, >>> Nimrod >>> >>> >>> On Sat, Oct 5, 2024 at 11:20 PM karan alang <karan.al...@gmail.com> >>> wrote: >>> >>>> Thanks Nimrod ! >>>> yes the checkpoint is in GCS and as you pointed out the service account >>>> being used did not have access to the GCS storage bucket, i was able to add >>>> the permissions and GCS bucket is accessible now .. >>>> >>>> however, here is another issue -I've mounted a secret 'spark-gcs-creds' >>>> which contains the key to the service account 'spark' being used by the >>>> application .. however the secret is not getting mounted. >>>> >>>> here is the error : >>>> ``` >>>> >>>> 24/10/05 20:10:36 INFO BasicExecutorFeatureStep: Decommissioning not >>>> enabled, skipping shutdown script >>>> >>>> 24/10/05 20:10:38 WARN FileSystem: Failed to initialize fileystem >>>> gs://vkp-spark-history-server/spark-events: java.io.FileNotFoundException: >>>> /mnt/secrets/spark-gcs-key.json (No such file or directory) >>>> >>>> 24/10/05 20:10:38 ERROR SparkContext: Error initializing SparkContext. >>>> >>>> java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such >>>> file or directory) >>>> >>>> at java.base/java.io.FileInputStream.open0(Native Method) >>>> >>>> at java.base/java.io.FileInputStream.open(Unknown Source) >>>> >>>> at java.base/java.io.FileInputStream.<init>(Unknown Source) >>>> >>>> at java.base/java.io.FileInputStream.<init>(Unknown Source) >>>> >>>> at >>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromJsonKeyFile(CredentialFactory.java:297) >>>> >>>> at >>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:414) >>>> >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1479) >>>> >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1638) >>>> >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1620) >>>> >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:507) >>>> >>>> at >>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469) >>>> >>>> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) >>>> >>>> at >>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) >>>> >>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) >>>> >>>> ``` >>>> >>>> here is the job yaml -> >>>> >>>> apiVersion: "sparkoperator.k8s.io/v1beta2" >>>> kind: SparkApplication >>>> metadata: >>>> name: structured-streaming-350-{{ now | unixEpoch }} >>>> namespace: so350 >>>> spec: >>>> type: Python >>>> mode: cluster >>>> sparkVersion: "3.5.0" >>>> image: " >>>> us-east1-docker.pkg.dev/versa-kafka-poc/spark-job-repo/ss-main-so350:3.5.0 >>>> " >>>> imagePullPolicy: Always >>>> imagePullSecrets: >>>> - gcr-json-key >>>> mainApplicationFile: "local:///opt/spark/custom-dir/main.py" >>>> restartPolicy: >>>> type: OnFailure >>>> onFailureRetries: 3 >>>> onFailureRetryInterval: 10 >>>> onSubmissionFailureRetries: 5 >>>> onSubmissionFailureRetryInterval: 20 >>>> driver: >>>> cores: {{ .Values.driver.cores }} >>>> coreLimit: "{{ .Values.driver.coreLimit }}" >>>> memory: "{{ .Values.driver.memory }}" >>>> labels: >>>> version: 3.5.0 >>>> serviceAccount: spark >>>> securityContext: >>>> runAsUser: 185 # UID for spark user >>>> volumeMounts: >>>> - name: gcs-key >>>> mountPath: /mnt/secrets >>>> readOnly: true >>>> initContainers: >>>> - name: init1 >>>> image: busybox:1.28 >>>> command: ['sh', '-c', "until nslookup myservice.$(cat /var/run/secrets/ >>>> kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo >>>> waiting for myservice; sleep 2; done"] >>>> - name: init2 >>>> image: busybox >>>> command: >>>> - /bin/sh >>>> - "-c" >>>> - | >>>> echo "Checking mounted secrets"; >>>> ls -l /mnt/secrets; >>>> cat /mnt/secrets/spark-gcs-key.json; >>>> sleep 180; # Keep the init container running for inspection >>>> executor: >>>> cores: {{ .Values.executor.cores }} >>>> instances: {{ .Values.executor.instances }} >>>> memory: "{{ .Values.executor.memory }}" >>>> labels: >>>> version: "3.5.0" >>>> securityContext: >>>> runAsUser: 185 # UID for spark user >>>> volumeMounts: >>>> - name: gcs-key >>>> mountPath: /mnt/secrets >>>> readOnly: true >>>> volumes: >>>> - name: gcs-key >>>> secret: >>>> secretName: spark-gcs-creds >>>> deps: >>>> jars: >>>> - local:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar >>>> - local:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar >>>> - local:///opt/spark/other-jars/bson-4.0.5.jar >>>> - local:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar >>>> - local:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar >>>> - local:///opt/spark/other-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar >>>> - >>>> local:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar >>>> - >>>> local:///opt/spark/other-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar >>>> - >>>> local:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar >>>> - >>>> local:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar >>>> - local:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar >>>> - local:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar >>>> - >>>> local:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar >>>> - local:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar >>>> - >>>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar >>>> - local:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar >>>> - >>>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar >>>> pyFiles: >>>> - local:///opt/spark/zips/streams.zip >>>> - local:///opt/spark/zips/utils.zip >>>> files: >>>> - local:///opt/spark/certs/syslog-vani-prefix.p12 >>>> - local:///opt/spark/certs/versa-alarmblock-test-user.p12 >>>> - local:///opt/spark/certs/versa-appstat-test-user.p12 >>>> - local:///opt/spark/certs/versa-bandwidth-test-user.p12 >>>> - local:///opt/spark/certs/intfutil-user-test.p12 >>>> - local:///opt/spark/certs/alarm-compression-user-test.p12 >>>> - local:///opt/spark/certs/alarmblock-user-test.p12 >>>> - local:///opt/spark/certs/appstat-agg-user-test.p12 >>>> - local:///opt/spark/certs/appstat-anomaly-user-test.p12 >>>> - local:///opt/spark/certs/appstats-user-test.p12 >>>> - local:///opt/spark/certs/insights-user-test.p12 >>>> - local:///opt/spark/cfg/params.cfg >>>> - local:///opt/spark/cfg/params_password.cfg >>>> pythonVersion: "{{ .Values.pythonVersion }}" >>>> sparkConf: >>>> "spark.hadoop.google.cloud.project.id": "versa-kafka-poc" >>>> "spark.hadoop.fs.gs.project.id": "versa-kafka-poc" >>>> "spark.kafka.ssl.keystore.location": >>>> "/opt/spark/certs/syslog-vani-prefix.p12" >>>> "spark.kafka.ssl.truststore.location": >>>> "/opt/spark/certs/versa-kafka-poc-tf-ca.p12" >>>> "spark.kubernetes.namespace": "so350" >>>> "spark.kubernetes.authenticate.driver.serviceAccountName": "spark" >>>> "spark.kubernetes.container.image": "{{ .Values.image }}" >>>> "spark.kubernetes.driver.container.image": "{{ .Values.image }}" >>>> "spark.kubernetes.executor.container.image": "{{ .Values.image }}" >>>> "spark.hadoop.fs.gs.impl": >>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" >>>> "spark.hadoop.fs.AbstractFileSystem.gs.impl": >>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" >>>> "spark.hadoop.google.cloud.auth.service.account.enable": "true" >>>> "spark.hadoop.google.cloud.auth.service.account.json.keyfile": >>>> "/mnt/secrets/spark-gcs-key.json" >>>> "spark.eventLog.enabled": "true" >>>> "spark.eventLog.dir": "{{ .Values.sparkEventLogDir }}" >>>> "spark.hadoop.fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE" >>>> "spark.dynamicAllocation.enabled": "false" >>>> "spark.dynamicAllocation.executorIdleTimeout": "120s" >>>> "spark.shuffle.service.enabled": "false" >>>> "spark.kubernetes.executor.deleteOnTermination": "false" >>>> "spark.jars": >>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" >>>> "spark.driver.extraClassPath": >>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" >>>> "spark.executor.extraClassPath": >>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar" >>>> "spark.submit.pyFiles": >>>> "file:///opt/spark/zips/streams.zip,file:///opt/spark/zips/utils.zip" >>>> hadoopConf: >>>> "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" >>>> "fs.AbstractFileSystem.gs.impl": >>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" >>>> "google.cloud.auth.service.account.enable": "true" >>>> "google.cloud.auth.service.account.json.keyfile": >>>> "/mnt/secrets/spark-gcs-key.json" >>>> arguments: >>>> - "--isdebug={{ .Values.isdebug }}" >>>> - "--istest={{ .Values.isdebug }}" >>>> >>>> here is snapshot of the secret : >>>> >>>> ``` >>>> >>>> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret >>>> spark-gcs-creds -n so350 -o yaml >>>> >>>> apiVersion: v1 >>>> >>>> data: >>>> >>>> spark-gcs-key.json: >>>> >>>> <--- KEY ---> >>>> >>>> kind: Secret >>>> >>>> metadata: >>>> >>>> creationTimestamp: "2024-10-04T21:54:06Z" >>>> >>>> name: spark-gcs-creds >>>> >>>> namespace: so350 >>>> >>>> resourceVersion: "180991552" >>>> >>>> uid: ac30c575-9abf-4a77-ba90-15576607c97f >>>> >>>> type: Opaque >>>> >>>> ``` >>>> >>>> Any ideas on how to debug/fix this ? >>>> >>>> tia! >>>> >>>> >>>> >>>> On Thu, Oct 3, 2024 at 12:37 AM Nimrod Ofek <ofek.nim...@gmail.com> >>>> wrote: >>>> >>>>> Where is the checkpoint location? Not in GCS? >>>>> Probably the location of the checkpoint is there- and you don't have >>>>> permissions for that... >>>>> >>>>> בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang < >>>>> karan.al...@gmail.com>: >>>>> >>>>>> This seems to be the cause of this -> >>>>>> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not >>>>>> getting mounted die to this error -> MountVolume.SetUp failed for volume >>>>>> “spark-conf-volume-driver >>>>>> >>>>>> I'm getting same error in event logs, and the secret mounted is not >>>>>> getting read >>>>>> >>>>>> If anyone is using spark-operator, and run into this issue .. pls let >>>>>> me know. >>>>>> Alternatively, is there another operator that can be used ? >>>>>> >>>>>> thanks! >>>>>> >>>>>> >>>>>> On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running >>>>>>> a structured streaming job which reads data from kafka .. the job is run >>>>>>> every 10 mins. >>>>>>> >>>>>>> It is giving an error shown below: >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> Traceback (most recent call last): >>>>>>> File "/opt/spark/custom-dir/main.py", line 356, in <module> >>>>>>> sys.exit(main()) >>>>>>> File "/opt/spark/custom-dir/main.py", line 352, in main >>>>>>> ss.readData() >>>>>>> File "/opt/spark/custom-dir/main.py", line 327, in readData >>>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >>>>>>> "topic").writeStream \ >>>>>>> File >>>>>>> "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", line >>>>>>> 1491, in start >>>>>>> return self._sq(self._jwrite.start()) >>>>>>> File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", >>>>>>> line 1304, in __call__ >>>>>>> return_value = get_return_value( >>>>>>> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", >>>>>>> line 111, in deco >>>>>>> return f(*a, **kw) >>>>>>> File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line >>>>>>> 326, in get_return_value >>>>>>> raise Py4JJavaError( >>>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start. >>>>>>> : >>>>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: >>>>>>> 403 Forbidden >>>>>>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc >>>>>>> { >>>>>>> "code" : 403, >>>>>>> "errors" : [ { >>>>>>> "domain" : "global", >>>>>>> "message" : >>>>>>> "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not have >>>>>>> storage.buckets.create access to the Google Cloud project. Permission >>>>>>> 'storage.buckets.create' denied on resource (or it may not exist).", >>>>>>> "reason" : "forbidden" >>>>>>> } ], >>>>>>> "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com >>>>>>> does not have storage.buckets.create access to the Google Cloud >>>>>>> project. Permission 'storage.buckets.create' denied on resource (or it >>>>>>> may not exist)." >>>>>>> } >>>>>>> >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> Code reading data from kafka on GKE : >>>>>>> >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>>>> .option("kafka.security.protocol", "SSL") \ >>>>>>> .option("kafka.ssl.truststore.location", >>>>>>> self.ssl_truststore_location) \ >>>>>>> .option("kafka.ssl.truststore.password", >>>>>>> self.ssl_truststore_password) \ >>>>>>> .option("kafka.ssl.keystore.location", >>>>>>> self.ssl_keystore_location) \ >>>>>>> .option("kafka.ssl.keystore.password", >>>>>>> self.ssl_keystore_password) \ >>>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>>>> .option("subscribePattern", "topic") \ >>>>>>> .option("startingOffsets", "latest") \ >>>>>>> .option("failOnDataLoss", "false") \ >>>>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>>>> .load() >>>>>>> >>>>>>> logger.info(f" df_stream, calling convertToDictForEachBatch -> >>>>>>> {df_stream}") >>>>>>> # trigger once >>>>>>> >>>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>>>> "timestamp", "topic").writeStream \ >>>>>>> .outputMode("append") \ >>>>>>> .trigger(processingTime='10 minutes') \ >>>>>>> .option("truncate", "false") \ >>>>>>> .option("checkpointLocation", self.checkpoint) \ >>>>>>> .foreachBatch(self.convertToDictForEachBatch) \ >>>>>>> .start() >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> I'm unable to understand why error states - bucket.storage.create >>>>>>> privilege not there when the code is actually reading data from kafka >>>>>>> >>>>>>> Any ideas on how to debug/fix ? >>>>>>> >>>>>>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs >>>>>>> on kubernetes ? >>>>>>> >>>>>>> tia! >>>>>>> >>>>>>> here is stack overflow ticket -> >>>>>>> >>>>>>> >>>>>>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>