Where is the checkpoint location? Not in GCS? Probably the location of the checkpoint is there- and you don't have permissions for that...
בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang <karan.al...@gmail.com >: > This seems to be the cause of this -> > github.com/kubeflow/spark-operator/issues/1619 .. the secret is not > getting mounted die to this error -> MountVolume.SetUp failed for volume > “spark-conf-volume-driver > > I'm getting same error in event logs, and the secret mounted is not > getting read > > If anyone is using spark-operator, and run into this issue .. pls let me > know. > Alternatively, is there another operator that can be used ? > > thanks! > > > On Tue, Oct 1, 2024 at 3:41 PM karan alang <karan.al...@gmail.com> wrote: > >> I've kubeflow spark-operator installed on K8s (GKE), and i'm running a >> structured streaming job which reads data from kafka .. the job is run >> every 10 mins. >> >> It is giving an error shown below: >> >> ``` >> >> Traceback (most recent call last): >> File "/opt/spark/custom-dir/main.py", line 356, in <module> >> sys.exit(main()) >> File "/opt/spark/custom-dir/main.py", line 352, in main >> ss.readData() >> File "/opt/spark/custom-dir/main.py", line 327, in readData >> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >> "topic").writeStream \ >> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", >> line 1491, in start >> return self._sq(self._jwrite.start()) >> File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line >> 1304, in __call__ >> return_value = get_return_value( >> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line >> 111, in deco >> return f(*a, **kw) >> File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326, >> in get_return_value >> raise Py4JJavaError( >> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start. >> : >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: >> 403 Forbidden >> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc >> { >> "code" : 403, >> "errors" : [ { >> "domain" : "global", >> "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com >> does not have storage.buckets.create access to the Google Cloud project. >> Permission 'storage.buckets.create' denied on resource (or it may not >> exist).", >> "reason" : "forbidden" >> } ], >> "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does >> not have storage.buckets.create access to the Google Cloud project. >> Permission 'storage.buckets.create' denied on resource (or it may not >> exist)." >> } >> >> >> ``` >> >> Code reading data from kafka on GKE : >> >> >> ``` >> >> df_stream = self.spark.readStream.format('kafka') \ >> .option("kafka.security.protocol", "SSL") \ >> .option("kafka.ssl.truststore.location", >> self.ssl_truststore_location) \ >> .option("kafka.ssl.truststore.password", >> self.ssl_truststore_password) \ >> .option("kafka.ssl.keystore.location", >> self.ssl_keystore_location) \ >> .option("kafka.ssl.keystore.password", >> self.ssl_keystore_password) \ >> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >> .option("subscribePattern", "topic") \ >> .option("startingOffsets", "latest") \ >> .option("failOnDataLoss", "false") \ >> .option("kafka.metadata.max.age.ms", "1000") \ >> .option("kafka.ssl.keystore.type", "PKCS12") \ >> .option("kafka.ssl.truststore.type", "PKCS12") \ >> .load() >> >> logger.info(f" df_stream, calling convertToDictForEachBatch -> >> {df_stream}") >> # trigger once >> >> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >> "topic").writeStream \ >> .outputMode("append") \ >> .trigger(processingTime='10 minutes') \ >> .option("truncate", "false") \ >> .option("checkpointLocation", self.checkpoint) \ >> .foreachBatch(self.convertToDictForEachBatch) \ >> .start() >> >> ``` >> >> I'm unable to understand why error states - bucket.storage.create >> privilege not there when the code is actually reading data from kafka >> >> Any ideas on how to debug/fix ? >> >> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on >> kubernetes ? >> >> tia! >> >> here is stack overflow ticket -> >> >> >> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea >> >> >> >> >>