Where is the checkpoint location? Not in GCS?
Probably the location of the checkpoint is there- and you don't have
permissions for that...

בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang ‏<karan.al...@gmail.com
>:

> This seems to be the cause of this ->
> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not
> getting mounted die to this error -> MountVolume.SetUp failed for volume
> “spark-conf-volume-driver
>
> I'm getting same error in event logs, and the secret mounted is not
> getting read
>
> If anyone is using spark-operator, and run into this issue .. pls let me
> know.
> Alternatively, is there another operator that can be used ?
>
> thanks!
>
>
> On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com> wrote:
>
>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running a
>> structured streaming job which reads data from kafka .. the job is run
>> every 10 mins.
>>
>> It is giving an error shown below:
>>
>> ```
>>
>> Traceback (most recent call last):
>>   File "/opt/spark/custom-dir/main.py", line 356, in <module>
>>     sys.exit(main())
>>   File "/opt/spark/custom-dir/main.py", line 352, in main
>>     ss.readData()
>>   File "/opt/spark/custom-dir/main.py", line 327, in readData
>>     query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>> "topic").writeStream \
>>   File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", 
>> line 1491, in start
>>     return self._sq(self._jwrite.start())
>>   File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 
>> 1304, in __call__
>>     return_value = get_return_value(
>>   File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 
>> 111, in deco
>>     return f(*a, **kw)
>>   File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326, 
>> in get_return_value
>>     raise Py4JJavaError(
>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
>> : 
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
>>  403 Forbidden
>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
>> {
>>   "code" : 403,
>>   "errors" : [ {
>>     "domain" : "global",
>>     "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com 
>> does not have storage.buckets.create access to the Google Cloud project. 
>> Permission 'storage.buckets.create' denied on resource (or it may not 
>> exist).",
>>     "reason" : "forbidden"
>>   } ],
>>   "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does 
>> not have storage.buckets.create access to the Google Cloud project. 
>> Permission 'storage.buckets.create' denied on resource (or it may not 
>> exist)."
>> }
>>
>>
>> ```
>>
>> Code reading data from kafka on GKE :
>>
>>
>> ```
>>
>> df_stream = self.spark.readStream.format('kafka') \
>>             .option("kafka.security.protocol", "SSL") \
>>             .option("kafka.ssl.truststore.location", 
>> self.ssl_truststore_location) \
>>             .option("kafka.ssl.truststore.password", 
>> self.ssl_truststore_password) \
>>             .option("kafka.ssl.keystore.location", 
>> self.ssl_keystore_location) \
>>             .option("kafka.ssl.keystore.password", 
>> self.ssl_keystore_password) \
>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>             .option("subscribePattern", "topic") \
>>             .option("startingOffsets", "latest") \
>>             .option("failOnDataLoss", "false") \
>>             .option("kafka.metadata.max.age.ms", "1000") \
>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>             .load()
>>
>>         logger.info(f" df_stream, calling convertToDictForEachBatch -> 
>> {df_stream}")
>>         # trigger once
>>
>>         query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>> "topic").writeStream \
>>             .outputMode("append") \
>>             .trigger(processingTime='10 minutes') \
>>             .option("truncate", "false") \
>>             .option("checkpointLocation", self.checkpoint) \
>>             .foreachBatch(self.convertToDictForEachBatch) \
>>             .start()
>>
>> ```
>>
>> I'm unable to understand why error states - bucket.storage.create
>> privilege not there when the code is actually reading data from kafka
>>
>> Any ideas on how to debug/fix ?
>>
>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on
>> kubernetes ?
>>
>> tia!
>>
>> here is stack overflow ticket ->
>>
>>
>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea
>>
>>
>>
>>
>>

Reply via email to