I've kubeflow spark-operator installed on K8s (GKE), and i'm running a structured streaming job which reads data from kafka .. the job is run every 10 mins.
It is giving an error shown below: ``` Traceback (most recent call last): File "/opt/spark/custom-dir/main.py", line 356, in <module> sys.exit(main()) File "/opt/spark/custom-dir/main.py", line 352, in main ss.readData() File "/opt/spark/custom-dir/main.py", line 327, in readData query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \ File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", line 1491, in start return self._sq(self._jwrite.start()) File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 1304, in __call__ return_value = get_return_value( File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o103.start. : com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc { "code" : 403, "errors" : [ { "domain" : "global", "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource (or it may not exist).", "reason" : "forbidden" } ], "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource (or it may not exist)." } ``` Code reading data from kafka on GKE : ``` df_stream = self.spark.readStream.format('kafka') \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.truststore.location", self.ssl_truststore_location) \ .option("kafka.ssl.truststore.password", self.ssl_truststore_password) \ .option("kafka.ssl.keystore.location", self.ssl_keystore_location) \ .option("kafka.ssl.keystore.password", self.ssl_keystore_password) \ .option("kafka.bootstrap.servers", self.kafkaBrokers) \ .option("subscribePattern", "topic") \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .option("kafka.metadata.max.age.ms", "1000") \ .option("kafka.ssl.keystore.type", "PKCS12") \ .option("kafka.ssl.truststore.type", "PKCS12") \ .load() logger.info(f" df_stream, calling convertToDictForEachBatch -> {df_stream}") # trigger once query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \ .outputMode("append") \ .trigger(processingTime='10 minutes') \ .option("truncate", "false") \ .option("checkpointLocation", self.checkpoint) \ .foreachBatch(self.convertToDictForEachBatch) \ .start() ``` I'm unable to understand why error states - bucket.storage.create privilege not there when the code is actually reading data from kafka Any ideas on how to debug/fix ? Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on kubernetes ? tia! here is stack overflow ticket -> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea