Hi Karan,

It seems that the way you tried to integrate with GCP is not the way that
Kubeflow proposes. They set another way to use the secret for some reason...
Here
<https://www.kubeflow.org/docs/components/spark-operator/user-guide/gcp/> you
can see how they tell you to do that.

HTH,
Nimrod


On Sun, Oct 6, 2024 at 9:37 PM karan alang <karan.al...@gmail.com> wrote:

> Hi Nimrod,
>
> Thanks for the response ..
> The volume gcs-key(representing secret - spark-gcs-creds) is shared
> between the driver & executor both of which have the volumeMounts
> (screenshot attached).
> [image: Screenshot 2024-10-06 at 11.35.19 AM.png]
>  'volumes' is defined at the same level as driver & executor
>  Does this need to be changed ?
>
> the volume spark-gcs-creds does have the spark-gcs-key.json as shown in
> screenshot below:
>
> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret
> spark-gcs-creds -n so350 -o yaml
>
> apiVersion: v1
>
> data:
>
>   spark-gcs-key.json:
>
>   <--- KEY --->
>
> kind: Secret
>
> metadata:
>
>   creationTimestamp: "2024-10-04T21:54:06Z"
>
>   name: spark-gcs-creds
>
>   namespace: so350
>
>   resourceVersion: "180991552"
>
>   uid: ac30c575-9abf-4a77-ba90-15576607c97f
>
> type: Opaque
>
> any feedback on this
> tia!
>
>
> On Sun, Oct 6, 2024 at 1:16 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote:
>
>> Hi Karan,
>>
>> In the executor I see you have volume (and not just volumeMounts)- I
>> don't see you have volume in the driver - so you probably don't have such a
>> volume.
>> Also - please check if the volume has in its data section
>> for spark-gcs-key.json - you can check it with:
>> describe secret <secret-name>
>>
>> HTH,
>> Nimrod
>>
>>
>> On Sat, Oct 5, 2024 at 11:20 PM karan alang <karan.al...@gmail.com>
>> wrote:
>>
>>> Thanks Nimrod !
>>> yes the checkpoint is in GCS and as you pointed out the service account
>>> being used did not have access to the GCS storage bucket, i was able to add
>>> the permissions and GCS bucket is accessible now ..
>>>
>>> however, here is another issue -I've mounted a secret 'spark-gcs-creds'
>>> which contains the key to the service account 'spark' being used by the
>>> application .. however the secret is not getting mounted.
>>>
>>> here is the error :
>>> ```
>>>
>>> 24/10/05 20:10:36 INFO BasicExecutorFeatureStep: Decommissioning not
>>> enabled, skipping shutdown script
>>>
>>> 24/10/05 20:10:38 WARN FileSystem: Failed to initialize fileystem
>>> gs://vkp-spark-history-server/spark-events: java.io.FileNotFoundException:
>>> /mnt/secrets/spark-gcs-key.json (No such file or directory)
>>>
>>> 24/10/05 20:10:38 ERROR SparkContext: Error initializing SparkContext.
>>>
>>> java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such
>>> file or directory)
>>>
>>> at java.base/java.io.FileInputStream.open0(Native Method)
>>>
>>> at java.base/java.io.FileInputStream.open(Unknown Source)
>>>
>>> at java.base/java.io.FileInputStream.<init>(Unknown Source)
>>>
>>> at java.base/java.io.FileInputStream.<init>(Unknown Source)
>>>
>>> at
>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromJsonKeyFile(CredentialFactory.java:297)
>>>
>>> at
>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:414)
>>>
>>> at
>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1479)
>>>
>>> at
>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1638)
>>>
>>> at
>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1620)
>>>
>>> at
>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:507)
>>>
>>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
>>>
>>> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
>>>
>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
>>>
>>> ```
>>>
>>> here is the job yaml ->
>>>
>>> apiVersion: "sparkoperator.k8s.io/v1beta2"
>>> kind: SparkApplication
>>> metadata:
>>> name: structured-streaming-350-{{ now | unixEpoch }}
>>> namespace: so350
>>> spec:
>>> type: Python
>>> mode: cluster
>>> sparkVersion: "3.5.0"
>>> image: "
>>> us-east1-docker.pkg.dev/versa-kafka-poc/spark-job-repo/ss-main-so350:3.5.0
>>> "
>>> imagePullPolicy: Always
>>> imagePullSecrets:
>>> - gcr-json-key
>>> mainApplicationFile: "local:///opt/spark/custom-dir/main.py"
>>> restartPolicy:
>>> type: OnFailure
>>> onFailureRetries: 3
>>> onFailureRetryInterval: 10
>>> onSubmissionFailureRetries: 5
>>> onSubmissionFailureRetryInterval: 20
>>> driver:
>>> cores: {{ .Values.driver.cores }}
>>> coreLimit: "{{ .Values.driver.coreLimit }}"
>>> memory: "{{ .Values.driver.memory }}"
>>> labels:
>>> version: 3.5.0
>>> serviceAccount: spark
>>> securityContext:
>>> runAsUser: 185 # UID for spark user
>>> volumeMounts:
>>> - name: gcs-key
>>> mountPath: /mnt/secrets
>>> readOnly: true
>>> initContainers:
>>> - name: init1
>>> image: busybox:1.28
>>> command: ['sh', '-c', "until nslookup myservice.$(cat /var/run/secrets/
>>> kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo
>>> waiting for myservice; sleep 2; done"]
>>> - name: init2
>>> image: busybox
>>> command:
>>> - /bin/sh
>>> - "-c"
>>> - |
>>> echo "Checking mounted secrets";
>>> ls -l /mnt/secrets;
>>> cat /mnt/secrets/spark-gcs-key.json;
>>> sleep 180; # Keep the init container running for inspection
>>> executor:
>>> cores: {{ .Values.executor.cores }}
>>> instances: {{ .Values.executor.instances }}
>>> memory: "{{ .Values.executor.memory }}"
>>> labels:
>>> version: "3.5.0"
>>> securityContext:
>>> runAsUser: 185 # UID for spark user
>>> volumeMounts:
>>> - name: gcs-key
>>> mountPath: /mnt/secrets
>>> readOnly: true
>>> volumes:
>>> - name: gcs-key
>>> secret:
>>> secretName: spark-gcs-creds
>>> deps:
>>> jars:
>>> - local:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar
>>> - local:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar
>>> - local:///opt/spark/other-jars/bson-4.0.5.jar
>>> - local:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar
>>> - local:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar
>>> - local:///opt/spark/other-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar
>>> -
>>> local:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar
>>> -
>>> local:///opt/spark/other-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar
>>> - local:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar
>>> -
>>> local:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar
>>> - local:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar
>>> - local:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar
>>> -
>>> local:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar
>>> - local:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar
>>> -
>>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar
>>> - local:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar
>>> -
>>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar
>>> pyFiles:
>>> - local:///opt/spark/zips/streams.zip
>>> - local:///opt/spark/zips/utils.zip
>>> files:
>>> - local:///opt/spark/certs/syslog-vani-prefix.p12
>>> - local:///opt/spark/certs/versa-alarmblock-test-user.p12
>>> - local:///opt/spark/certs/versa-appstat-test-user.p12
>>> - local:///opt/spark/certs/versa-bandwidth-test-user.p12
>>> - local:///opt/spark/certs/intfutil-user-test.p12
>>> - local:///opt/spark/certs/alarm-compression-user-test.p12
>>> - local:///opt/spark/certs/alarmblock-user-test.p12
>>> - local:///opt/spark/certs/appstat-agg-user-test.p12
>>> - local:///opt/spark/certs/appstat-anomaly-user-test.p12
>>> - local:///opt/spark/certs/appstats-user-test.p12
>>> - local:///opt/spark/certs/insights-user-test.p12
>>> - local:///opt/spark/cfg/params.cfg
>>> - local:///opt/spark/cfg/params_password.cfg
>>> pythonVersion: "{{ .Values.pythonVersion }}"
>>> sparkConf:
>>> "spark.hadoop.google.cloud.project.id": "versa-kafka-poc"
>>> "spark.hadoop.fs.gs.project.id": "versa-kafka-poc"
>>> "spark.kafka.ssl.keystore.location":
>>> "/opt/spark/certs/syslog-vani-prefix.p12"
>>> "spark.kafka.ssl.truststore.location":
>>> "/opt/spark/certs/versa-kafka-poc-tf-ca.p12"
>>> "spark.kubernetes.namespace": "so350"
>>> "spark.kubernetes.authenticate.driver.serviceAccountName": "spark"
>>> "spark.kubernetes.container.image": "{{ .Values.image }}"
>>> "spark.kubernetes.driver.container.image": "{{ .Values.image }}"
>>> "spark.kubernetes.executor.container.image": "{{ .Values.image }}"
>>> "spark.hadoop.fs.gs.impl":
>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
>>> "spark.hadoop.fs.AbstractFileSystem.gs.impl":
>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
>>> "spark.hadoop.google.cloud.auth.service.account.enable": "true"
>>> "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
>>> "/mnt/secrets/spark-gcs-key.json"
>>> "spark.eventLog.enabled": "true"
>>> "spark.eventLog.dir": "{{ .Values.sparkEventLogDir }}"
>>> "spark.hadoop.fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
>>> "spark.dynamicAllocation.enabled": "false"
>>> "spark.dynamicAllocation.executorIdleTimeout": "120s"
>>> "spark.shuffle.service.enabled": "false"
>>> "spark.kubernetes.executor.deleteOnTermination": "false"
>>> "spark.jars":
>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
>>> "spark.driver.extraClassPath":
>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
>>> "spark.executor.extraClassPath":
>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
>>> "spark.submit.pyFiles":
>>> "file:///opt/spark/zips/streams.zip,file:///opt/spark/zips/utils.zip"
>>> hadoopConf:
>>> "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
>>> "fs.AbstractFileSystem.gs.impl":
>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
>>> "google.cloud.auth.service.account.enable": "true"
>>> "google.cloud.auth.service.account.json.keyfile":
>>> "/mnt/secrets/spark-gcs-key.json"
>>> arguments:
>>> - "--isdebug={{ .Values.isdebug }}"
>>> - "--istest={{ .Values.isdebug }}"
>>>
>>> here is snapshot of the secret :
>>>
>>> ```
>>>
>>> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret
>>> spark-gcs-creds -n so350 -o yaml
>>>
>>> apiVersion: v1
>>>
>>> data:
>>>
>>>   spark-gcs-key.json:
>>>
>>>   <--- KEY --->
>>>
>>> kind: Secret
>>>
>>> metadata:
>>>
>>>   creationTimestamp: "2024-10-04T21:54:06Z"
>>>
>>>   name: spark-gcs-creds
>>>
>>>   namespace: so350
>>>
>>>   resourceVersion: "180991552"
>>>
>>>   uid: ac30c575-9abf-4a77-ba90-15576607c97f
>>>
>>> type: Opaque
>>>
>>> ```
>>>
>>> Any ideas on how to debug/fix this  ?
>>>
>>> tia!
>>>
>>>
>>>
>>> On Thu, Oct 3, 2024 at 12:37 AM Nimrod Ofek <ofek.nim...@gmail.com>
>>> wrote:
>>>
>>>> Where is the checkpoint location? Not in GCS?
>>>> Probably the location of the checkpoint is there- and you don't have
>>>> permissions for that...
>>>>
>>>> בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang ‏<
>>>> karan.al...@gmail.com>:
>>>>
>>>>> This seems to be the cause of this ->
>>>>> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not
>>>>> getting mounted die to this error -> MountVolume.SetUp failed for volume
>>>>> “spark-conf-volume-driver
>>>>>
>>>>> I'm getting same error in event logs, and the secret mounted is not
>>>>> getting read
>>>>>
>>>>> If anyone is using spark-operator, and run into this issue .. pls let
>>>>> me know.
>>>>> Alternatively, is there another operator that can be used ?
>>>>>
>>>>> thanks!
>>>>>
>>>>>
>>>>> On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running
>>>>>> a structured streaming job which reads data from kafka .. the job is run
>>>>>> every 10 mins.
>>>>>>
>>>>>> It is giving an error shown below:
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> Traceback (most recent call last):
>>>>>>   File "/opt/spark/custom-dir/main.py", line 356, in <module>
>>>>>>     sys.exit(main())
>>>>>>   File "/opt/spark/custom-dir/main.py", line 352, in main
>>>>>>     ss.readData()
>>>>>>   File "/opt/spark/custom-dir/main.py", line 327, in readData
>>>>>>     query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>>>>>> "topic").writeStream \
>>>>>>   File 
>>>>>> "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", line 
>>>>>> 1491, in start
>>>>>>     return self._sq(self._jwrite.start())
>>>>>>   File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", 
>>>>>> line 1304, in __call__
>>>>>>     return_value = get_return_value(
>>>>>>   File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", 
>>>>>> line 111, in deco
>>>>>>     return f(*a, **kw)
>>>>>>   File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 
>>>>>> 326, in get_return_value
>>>>>>     raise Py4JJavaError(
>>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
>>>>>> : 
>>>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
>>>>>>  403 Forbidden
>>>>>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
>>>>>> {
>>>>>>   "code" : 403,
>>>>>>   "errors" : [ {
>>>>>>     "domain" : "global",
>>>>>>     "message" : 
>>>>>> "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not have 
>>>>>> storage.buckets.create access to the Google Cloud project. Permission 
>>>>>> 'storage.buckets.create' denied on resource (or it may not exist).",
>>>>>>     "reason" : "forbidden"
>>>>>>   } ],
>>>>>>   "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com 
>>>>>> does not have storage.buckets.create access to the Google Cloud project. 
>>>>>> Permission 'storage.buckets.create' denied on resource (or it may not 
>>>>>> exist)."
>>>>>> }
>>>>>>
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> Code reading data from kafka on GKE :
>>>>>>
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> df_stream = self.spark.readStream.format('kafka') \
>>>>>>             .option("kafka.security.protocol", "SSL") \
>>>>>>             .option("kafka.ssl.truststore.location", 
>>>>>> self.ssl_truststore_location) \
>>>>>>             .option("kafka.ssl.truststore.password", 
>>>>>> self.ssl_truststore_password) \
>>>>>>             .option("kafka.ssl.keystore.location", 
>>>>>> self.ssl_keystore_location) \
>>>>>>             .option("kafka.ssl.keystore.password", 
>>>>>> self.ssl_keystore_password) \
>>>>>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>>>             .option("subscribePattern", "topic") \
>>>>>>             .option("startingOffsets", "latest") \
>>>>>>             .option("failOnDataLoss", "false") \
>>>>>>             .option("kafka.metadata.max.age.ms", "1000") \
>>>>>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>>>             .load()
>>>>>>
>>>>>>         logger.info(f" df_stream, calling convertToDictForEachBatch -> 
>>>>>> {df_stream}")
>>>>>>         # trigger once
>>>>>>
>>>>>>         query = df_stream.selectExpr("CAST(value AS STRING)", 
>>>>>> "timestamp", "topic").writeStream \
>>>>>>             .outputMode("append") \
>>>>>>             .trigger(processingTime='10 minutes') \
>>>>>>             .option("truncate", "false") \
>>>>>>             .option("checkpointLocation", self.checkpoint) \
>>>>>>             .foreachBatch(self.convertToDictForEachBatch) \
>>>>>>             .start()
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> I'm unable to understand why error states - bucket.storage.create
>>>>>> privilege not there when the code is actually reading data from kafka
>>>>>>
>>>>>> Any ideas on how to debug/fix ?
>>>>>>
>>>>>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs
>>>>>> on kubernetes ?
>>>>>>
>>>>>> tia!
>>>>>>
>>>>>> here is stack overflow ticket ->
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to