I've kubeflow spark-operator installed on K8s (GKE), and i'm running a
structured streaming job which reads data from kafka .. the job is run
every 10 mins.

It is giving an error shown below:

```

Traceback (most recent call last):
  File "/opt/spark/custom-dir/main.py", line 356, in <module>
    sys.exit(main())
  File "/opt/spark/custom-dir/main.py", line 352, in main
    ss.readData()
  File "/opt/spark/custom-dir/main.py", line 327, in readData
    query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py",
line 1491, in start
    return self._sq(self._jwrite.start())
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py",
line 1304, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py",
line 111, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
: 
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
403 Forbidden
POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" :
"spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com does not
have storage.buckets.create access to the Google Cloud project.
Permission 'storage.buckets.create' denied on resource (or it may not
exist).",
    "reason" : "forbidden"
  } ],
  "message" : "spark-gcs-acc...@versa-kafka-poc.iam.gserviceaccount.com
does not have storage.buckets.create access to the Google Cloud
project. Permission 'storage.buckets.create' denied on resource (or it
may not exist)."
}


```

Code reading data from kafka on GKE :


```

df_stream = self.spark.readStream.format('kafka') \
            .option("kafka.security.protocol", "SSL") \
            .option("kafka.ssl.truststore.location",
self.ssl_truststore_location) \
            .option("kafka.ssl.truststore.password",
self.ssl_truststore_password) \
            .option("kafka.ssl.keystore.location", self.ssl_keystore_location) \
            .option("kafka.ssl.keystore.password", self.ssl_keystore_password) \
            .option("kafka.bootstrap.servers", self.kafkaBrokers) \
            .option("subscribePattern", "topic") \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .option("kafka.metadata.max.age.ms", "1000") \
            .option("kafka.ssl.keystore.type", "PKCS12") \
            .option("kafka.ssl.truststore.type", "PKCS12") \
            .load()

        logger.info(f" df_stream, calling convertToDictForEachBatch ->
{df_stream}")
        # trigger once

        query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp", "topic").writeStream \
            .outputMode("append") \
            .trigger(processingTime='10 minutes') \
            .option("truncate", "false") \
            .option("checkpointLocation", self.checkpoint) \
            .foreachBatch(self.convertToDictForEachBatch) \
            .start()

```

I'm unable to understand why error states - bucket.storage.create privilege
not there when the code is actually reading data from kafka

Any ideas on how to debug/fix ?

Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on
kubernetes ?

tia!

here is stack overflow ticket ->

https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea

Reply via email to