Thanks Nimrod,

I was able to get this working .. the key changes ->

1. since I was using webhook.namespaceSelector="spark-webhook-enabled=true"
in the helm install,
the namespace also need to have the label - spark-webhook-enabled=true,
that ensures the volume/volumeMounts/configMaps are loaded

2. I changed the volume/volumeMount for the GCP secrets to secretType:
GCPServiceAccount (though i think the volume/volumeMount might work as
well, after the change 1)

secrets:
- name: spark-gcs-creds
path: /mnt/secrets
secretType: GCPServiceAccount



On Mon, Oct 7, 2024 at 2:40 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote:

> Hi Karan,
>
> It seems that the way you tried to integrate with GCP is not the way that
> Kubeflow proposes. They set another way to use the secret for some reason...
> Here
> <https://www.kubeflow.org/docs/components/spark-operator/user-guide/gcp/> you
> can see how they tell you to do that.
>
> HTH,
> Nimrod
>
>
> On Sun, Oct 6, 2024 at 9:37 PM karan alang <karan.al...@gmail.com> wrote:
>
>> Hi Nimrod,
>>
>> Thanks for the response ..
>> The volume gcs-key(representing secret - spark-gcs-creds) is shared
>> between the driver & executor both of which have the volumeMounts
>> (screenshot attached).
>> [image: Screenshot 2024-10-06 at 11.35.19 AM.png]
>>  'volumes' is defined at the same level as driver & executor
>>  Does this need to be changed ?
>>
>> the volume spark-gcs-creds does have the spark-gcs-key.json as shown in
>> screenshot below:
>>
>> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret
>> spark-gcs-creds -n so350 -o yaml
>>
>> apiVersion: v1
>>
>> data:
>>
>>   spark-gcs-key.json:
>>
>>   <--- KEY --->
>>
>> kind: Secret
>>
>> metadata:
>>
>>   creationTimestamp: "2024-10-04T21:54:06Z"
>>
>>   name: spark-gcs-creds
>>
>>   namespace: so350
>>
>>   resourceVersion: "180991552"
>>
>>   uid: ac30c575-9abf-4a77-ba90-15576607c97f
>>
>> type: Opaque
>>
>> any feedback on this
>> tia!
>>
>>
>> On Sun, Oct 6, 2024 at 1:16 AM Nimrod Ofek <ofek.nim...@gmail.com> wrote:
>>
>>> Hi Karan,
>>>
>>> In the executor I see you have volume (and not just volumeMounts)- I
>>> don't see you have volume in the driver - so you probably don't have such a
>>> volume.
>>> Also - please check if the volume has in its data section
>>> for spark-gcs-key.json - you can check it with:
>>> describe secret <secret-name>
>>>
>>> HTH,
>>> Nimrod
>>>
>>>
>>> On Sat, Oct 5, 2024 at 11:20 PM karan alang <karan.al...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Nimrod !
>>>> yes the checkpoint is in GCS and as you pointed out the service account
>>>> being used did not have access to the GCS storage bucket, i was able to add
>>>> the permissions and GCS bucket is accessible now ..
>>>>
>>>> however, here is another issue -I've mounted a secret 'spark-gcs-creds'
>>>> which contains the key to the service account 'spark' being used by the
>>>> application .. however the secret is not getting mounted.
>>>>
>>>> here is the error :
>>>> ```
>>>>
>>>> 24/10/05 20:10:36 INFO BasicExecutorFeatureStep: Decommissioning not
>>>> enabled, skipping shutdown script
>>>>
>>>> 24/10/05 20:10:38 WARN FileSystem: Failed to initialize fileystem
>>>> gs://vkp-spark-history-server/spark-events: java.io.FileNotFoundException:
>>>> /mnt/secrets/spark-gcs-key.json (No such file or directory)
>>>>
>>>> 24/10/05 20:10:38 ERROR SparkContext: Error initializing SparkContext.
>>>>
>>>> java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such
>>>> file or directory)
>>>>
>>>> at java.base/java.io.FileInputStream.open0(Native Method)
>>>>
>>>> at java.base/java.io.FileInputStream.open(Unknown Source)
>>>>
>>>> at java.base/java.io.FileInputStream.<init>(Unknown Source)
>>>>
>>>> at java.base/java.io.FileInputStream.<init>(Unknown Source)
>>>>
>>>> at
>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromJsonKeyFile(CredentialFactory.java:297)
>>>>
>>>> at
>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:414)
>>>>
>>>> at
>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1479)
>>>>
>>>> at
>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1638)
>>>>
>>>> at
>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1620)
>>>>
>>>> at
>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:507)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
>>>>
>>>> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
>>>>
>>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
>>>>
>>>> ```
>>>>
>>>> here is the job yaml ->
>>>>
>>>> apiVersion: "sparkoperator.k8s.io/v1beta2"
>>>> kind: SparkApplication
>>>> metadata:
>>>> name: structured-streaming-350-{{ now | unixEpoch }}
>>>> namespace: so350
>>>> spec:
>>>> type: Python
>>>> mode: cluster
>>>> sparkVersion: "3.5.0"
>>>> image: "
>>>> us-east1-docker.pkg.dev/versa-kafka-poc/spark-job-repo/ss-main-so350:3.5.0
>>>> "
>>>> imagePullPolicy: Always
>>>> imagePullSecrets:
>>>> - gcr-json-key
>>>> mainApplicationFile: "local:///opt/spark/custom-dir/main.py"
>>>> restartPolicy:
>>>> type: OnFailure
>>>> onFailureRetries: 3
>>>> onFailureRetryInterval: 10
>>>> onSubmissionFailureRetries: 5
>>>> onSubmissionFailureRetryInterval: 20
>>>> driver:
>>>> cores: {{ .Values.driver.cores }}
>>>> coreLimit: "{{ .Values.driver.coreLimit }}"
>>>> memory: "{{ .Values.driver.memory }}"
>>>> labels:
>>>> version: 3.5.0
>>>> serviceAccount: spark
>>>> securityContext:
>>>> runAsUser: 185 # UID for spark user
>>>> volumeMounts:
>>>> - name: gcs-key
>>>> mountPath: /mnt/secrets
>>>> readOnly: true
>>>> initContainers:
>>>> - name: init1
>>>> image: busybox:1.28
>>>> command: ['sh', '-c', "until nslookup myservice.$(cat /var/run/secrets/
>>>> kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo
>>>> waiting for myservice; sleep 2; done"]
>>>> - name: init2
>>>> image: busybox
>>>> command:
>>>> - /bin/sh
>>>> - "-c"
>>>> - |
>>>> echo "Checking mounted secrets";
>>>> ls -l /mnt/secrets;
>>>> cat /mnt/secrets/spark-gcs-key.json;
>>>> sleep 180; # Keep the init container running for inspection
>>>> executor:
>>>> cores: {{ .Values.executor.cores }}
>>>> instances: {{ .Values.executor.instances }}
>>>> memory: "{{ .Values.executor.memory }}"
>>>> labels:
>>>> version: "3.5.0"
>>>> securityContext:
>>>> runAsUser: 185 # UID for spark user
>>>> volumeMounts:
>>>> - name: gcs-key
>>>> mountPath: /mnt/secrets
>>>> readOnly: true
>>>> volumes:
>>>> - name: gcs-key
>>>> secret:
>>>> secretName: spark-gcs-creds
>>>> deps:
>>>> jars:
>>>> - local:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar
>>>> - local:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar
>>>> - local:///opt/spark/other-jars/bson-4.0.5.jar
>>>> - local:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar
>>>> - local:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar
>>>> - local:///opt/spark/other-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar
>>>> -
>>>> local:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar
>>>> -
>>>> local:///opt/spark/other-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar
>>>> -
>>>> local:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar
>>>> -
>>>> local:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar
>>>> - local:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar
>>>> - local:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar
>>>> -
>>>> local:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar
>>>> - local:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar
>>>> -
>>>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar
>>>> - local:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar
>>>> -
>>>> local:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar
>>>> pyFiles:
>>>> - local:///opt/spark/zips/streams.zip
>>>> - local:///opt/spark/zips/utils.zip
>>>> files:
>>>> - local:///opt/spark/certs/syslog-vani-prefix.p12
>>>> - local:///opt/spark/certs/versa-alarmblock-test-user.p12
>>>> - local:///opt/spark/certs/versa-appstat-test-user.p12
>>>> - local:///opt/spark/certs/versa-bandwidth-test-user.p12
>>>> - local:///opt/spark/certs/intfutil-user-test.p12
>>>> - local:///opt/spark/certs/alarm-compression-user-test.p12
>>>> - local:///opt/spark/certs/alarmblock-user-test.p12
>>>> - local:///opt/spark/certs/appstat-agg-user-test.p12
>>>> - local:///opt/spark/certs/appstat-anomaly-user-test.p12
>>>> - local:///opt/spark/certs/appstats-user-test.p12
>>>> - local:///opt/spark/certs/insights-user-test.p12
>>>> - local:///opt/spark/cfg/params.cfg
>>>> - local:///opt/spark/cfg/params_password.cfg
>>>> pythonVersion: "{{ .Values.pythonVersion }}"
>>>> sparkConf:
>>>> "spark.hadoop.google.cloud.project.id": "versa-kafka-poc"
>>>> "spark.hadoop.fs.gs.project.id": "versa-kafka-poc"
>>>> "spark.kafka.ssl.keystore.location":
>>>> "/opt/spark/certs/syslog-vani-prefix.p12"
>>>> "spark.kafka.ssl.truststore.location":
>>>> "/opt/spark/certs/versa-kafka-poc-tf-ca.p12"
>>>> "spark.kubernetes.namespace": "so350"
>>>> "spark.kubernetes.authenticate.driver.serviceAccountName": "spark"
>>>> "spark.kubernetes.container.image": "{{ .Values.image }}"
>>>> "spark.kubernetes.driver.container.image": "{{ .Values.image }}"
>>>> "spark.kubernetes.executor.container.image": "{{ .Values.image }}"
>>>> "spark.hadoop.fs.gs.impl":
>>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
>>>> "spark.hadoop.fs.AbstractFileSystem.gs.impl":
>>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
>>>> "spark.hadoop.google.cloud.auth.service.account.enable": "true"
>>>> "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
>>>> "/mnt/secrets/spark-gcs-key.json"
>>>> "spark.eventLog.enabled": "true"
>>>> "spark.eventLog.dir": "{{ .Values.sparkEventLogDir }}"
>>>> "spark.hadoop.fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
>>>> "spark.dynamicAllocation.enabled": "false"
>>>> "spark.dynamicAllocation.executorIdleTimeout": "120s"
>>>> "spark.shuffle.service.enabled": "false"
>>>> "spark.kubernetes.executor.deleteOnTermination": "false"
>>>> "spark.jars":
>>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
>>>> "spark.driver.extraClassPath":
>>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
>>>> "spark.executor.extraClassPath":
>>>> "file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
>>>> "spark.submit.pyFiles":
>>>> "file:///opt/spark/zips/streams.zip,file:///opt/spark/zips/utils.zip"
>>>> hadoopConf:
>>>> "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
>>>> "fs.AbstractFileSystem.gs.impl":
>>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
>>>> "google.cloud.auth.service.account.enable": "true"
>>>> "google.cloud.auth.service.account.json.keyfile":
>>>> "/mnt/secrets/spark-gcs-key.json"
>>>> arguments:
>>>> - "--isdebug={{ .Values.isdebug }}"
>>>> - "--istest={{ .Values.isdebug }}"
>>>>
>>>> here is snapshot of the secret :
>>>>
>>>> ```
>>>>
>>>> (base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret
>>>> spark-gcs-creds -n so350 -o yaml
>>>>
>>>> apiVersion: v1
>>>>
>>>> data:
>>>>
>>>>   spark-gcs-key.json:
>>>>
>>>>   <--- KEY --->
>>>>
>>>> kind: Secret
>>>>
>>>> metadata:
>>>>
>>>>   creationTimestamp: "2024-10-04T21:54:06Z"
>>>>
>>>>   name: spark-gcs-creds
>>>>
>>>>   namespace: so350
>>>>
>>>>   resourceVersion: "180991552"
>>>>
>>>>   uid: ac30c575-9abf-4a77-ba90-15576607c97f
>>>>
>>>> type: Opaque
>>>>
>>>> ```
>>>>
>>>> Any ideas on how to debug/fix this  ?
>>>>
>>>> tia!
>>>>
>>>>
>>>>
>>>> On Thu, Oct 3, 2024 at 12:37 AM Nimrod Ofek <ofek.nim...@gmail.com>
>>>> wrote:
>>>>
>>>>> Where is the checkpoint location? Not in GCS?
>>>>> Probably the location of the checkpoint is there- and you don't have
>>>>> permissions for that...
>>>>>
>>>>> בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang ‏<
>>>>> karan.al...@gmail.com>:
>>>>>
>>>>>> This seems to be the cause of this ->
>>>>>> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not
>>>>>> getting mounted die to this error -> MountVolume.SetUp failed for volume
>>>>>> “spark-conf-volume-driver
>>>>>>
>>>>>> I'm getting same error in event logs, and the secret mounted is not
>>>>>> getting read
>>>>>>
>>>>>> If anyone is using spark-operator, and run into this issue .. pls let
>>>>>> me know.
>>>>>> Alternatively, is there another operator that can be used ?
>>>>>>
>>>>>> thanks!
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running
>>>>>>> a structured streaming job which reads data from kafka .. the job is run
>>>>>>> every 10 mins.
>>>>>>>
>>>>>>> It is giving an error shown below:
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "/opt/spark/custom-dir/main.py", line 356, in <module>
>>>>>>>     sys.exit(main())
>>>>>>>   File "/opt/spark/custom-dir/main.py", line 352, in main
>>>>>>>     ss.readData()
>>>>>>>   File "/opt/spark/custom-dir/main.py", line 327, in readData
>>>>>>>     query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>>>>>>> "topic").writeStream \
>>>>>>>   File 
>>>>>>> "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", line 
>>>>>>> 1491, in start
>>>>>>>     return self._sq(self._jwrite.start())
>>>>>>>   File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", 
>>>>>>> line 1304, in __call__
>>>>>>>     return_value = get_return_value(
>>>>>>>   File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", 
>>>>>>> line 111, in deco
>>>>>>>     return f(*a, **kw)
>>>>>>>   File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 
>>>>>>> 326, in get_return_value
>>>>>>>     raise Py4JJavaError(
>>>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
>>>>>>> : 
>>>>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
>>>>>>>  403 Forbidden
>>>>>>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
>>>>>>> {
>>>>>>>   "code" : 403,
>>>>>>>   "errors" : [ {
>>>>>>>     "domain" : "global",
>>>>>>>     "message" : 
>>>>>>> "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not have 
>>>>>>> storage.buckets.create access to the Google Cloud project. Permission 
>>>>>>> 'storage.buckets.create' denied on resource (or it may not exist).",
>>>>>>>     "reason" : "forbidden"
>>>>>>>   } ],
>>>>>>>   "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com 
>>>>>>> does not have storage.buckets.create access to the Google Cloud 
>>>>>>> project. Permission 'storage.buckets.create' denied on resource (or it 
>>>>>>> may not exist)."
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> Code reading data from kafka on GKE :
>>>>>>>
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> df_stream = self.spark.readStream.format('kafka') \
>>>>>>>             .option("kafka.security.protocol", "SSL") \
>>>>>>>             .option("kafka.ssl.truststore.location", 
>>>>>>> self.ssl_truststore_location) \
>>>>>>>             .option("kafka.ssl.truststore.password", 
>>>>>>> self.ssl_truststore_password) \
>>>>>>>             .option("kafka.ssl.keystore.location", 
>>>>>>> self.ssl_keystore_location) \
>>>>>>>             .option("kafka.ssl.keystore.password", 
>>>>>>> self.ssl_keystore_password) \
>>>>>>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>>>>             .option("subscribePattern", "topic") \
>>>>>>>             .option("startingOffsets", "latest") \
>>>>>>>             .option("failOnDataLoss", "false") \
>>>>>>>             .option("kafka.metadata.max.age.ms", "1000") \
>>>>>>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>>>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>>>>             .load()
>>>>>>>
>>>>>>>         logger.info(f" df_stream, calling convertToDictForEachBatch -> 
>>>>>>> {df_stream}")
>>>>>>>         # trigger once
>>>>>>>
>>>>>>>         query = df_stream.selectExpr("CAST(value AS STRING)", 
>>>>>>> "timestamp", "topic").writeStream \
>>>>>>>             .outputMode("append") \
>>>>>>>             .trigger(processingTime='10 minutes') \
>>>>>>>             .option("truncate", "false") \
>>>>>>>             .option("checkpointLocation", self.checkpoint) \
>>>>>>>             .foreachBatch(self.convertToDictForEachBatch) \
>>>>>>>             .start()
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> I'm unable to understand why error states - bucket.storage.create
>>>>>>> privilege not there when the code is actually reading data from kafka
>>>>>>>
>>>>>>> Any ideas on how to debug/fix ?
>>>>>>>
>>>>>>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs
>>>>>>> on kubernetes ?
>>>>>>>
>>>>>>> tia!
>>>>>>>
>>>>>>> here is stack overflow ticket ->
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to