Sorted out :) I removed all references to Jason Service Key file i.e.
spark.conf.set("GcpJsonKeyFile", config['GCPVariables']['jsonKeyFile']) from the code and it worked. writing to BigQuery table test.randomData Populated BigQuery table test.randomData rows written is 100 Reading from BigQuery table test.randomData rows read in is 2300 +---+ |ID | +---+ |43 | |57 | |2 | |49 | |32 | |46 | |66 | |67 | |72 | |89 | |91 | |55 | |14 | |28 | |70 | |1 | |22 | |54 | |79 | |82 | +---+ only showing top 20 rows I assume that it checks json service account key file references before checking secrets! So in summary PySpark 3.11 with Java 8 with spark-bigquery-latest_2.12.jar works fine inside the docker image. The problem is that Debian buster no longer supports Java 8. HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 8 Aug 2021 at 18:58, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Further on this to avoid the issue with Java 11 I went back and installed > Java 8 on the docker image. This was simply done by cp -R > /opt/jdk1.8.0_201 to $SPARK_HOME and amending the docker image to copy it > across to /opt inside the image and exporting JAVA HOME as below > > RUN mkdir -p /opt/jdk1.8.0_201 > COPY jdk1.8.0_201 /opt/jdk1.8.0_201 > ENV JAVA_HOME=/opt/jdk1.8.0_201 > ENV JRE_HOME=/opt/jdk1.8.0_201/jre > RUN export JAVA_HOME > > Now the image is built on JAVA 8 > > docker run -it cfbb0e69f204 bash > 185@afd549bb7050:/opt/spark/work-dir$ echo $JAVA_HOME > /opt/jdk1.8.0_201 > > And I tested this image on minikube on premise and it worked fine reading > and writing to BigQuery as expected. > > Having pushed this image to Google cloud, I am facing some authorisation > issue that stops accessing BigQuery through GKE > > writing to BigQuery table test.randomData > An error occurred while calling o113.save. > : java.io.UncheckedIOException: Failed to create Credentials from file > at > com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.connector.common.BigQueryCredentialsSupplier.createCredentialsFromFile(BigQueryCredentialsSupplier.java:63) > > Caused by: java.io.FileNotFoundException: > <PTOJECT_ID>-224522-1a8ab25e740e.json (No such file or directory) > > at java.io.FileInputStream.open0(Native Method) > > at java.io.FileInputStream.open(FileInputStream.java:195) > > at java.io.FileInputStream.<init>(FileInputStream.java:138) > > I don't know why it is looking for that file (it is actually there). It is > supposed to use secrets! > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 7 Aug 2021 at 21:49, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Thanks Kartik, >> >> Yes indeed this is a BigQuery issue with Spark. Those two setting (below) >> did not work in spark-submit or adding to >> $SUBASE_HOME/conf/spark-defaults.conf >> >> --conf >> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ >> >> --conf >> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> \ >> >> >> I think the prime reason is the issue with JAVA 11 as on prem with local >> mode which uses Spark 3.1.1 with JAVA 8 It has no problem reading from >> BigQuery table >> >> >> writing to BigQuery table test.randomData >> >> Populated BigQuery table test.randomData >> >> rows written is 100 >> >> >> Reading from BigQuery table test.randomData >> >> rows read in is 1400 >> >> +---+ >> >> |ID | >> >> +---+ >> >> |43 | >> >> |57 | >> >> |2 | >> >> |49 | >> >> |32 | >> >> |46 | >> >> |66 | >> >> |67 | >> >> |72 | >> >> |89 | >> >> |91 | >> >> |55 | >> >> |14 | >> >> |28 | >> >> |70 | >> >> |1 | >> >> |22 | >> >> |54 | >> >> |79 | >> >> |82 | >> >> +---+ >> >> only showing top 20 rows >> >> The Java version on prem is >> >> echo $JAVA_HOME >> /opt/jdk1.8.0_201 >> >> And inside the docker image used in k8s it is >> echo $JAVA_HOME >> >> /usr/local/openjdk-11 >> >> The only option seems to be adding JAVA 8 to docker image but the latest >> Debian buster does not support it! >> >> >> A suggestion in one the links you sent is >> >> >> "Unless you are using Java 11 specific APIs, you can compile with Java 8 >> and run it on Java 11. We haven't adapted the connector to be compiled with >> Java 11 (but it's on the roadmap). Another option - bring the connector in >> runtime by adding it to the spark.jars or spark.jars.packages." >> >> >> Cheers, >> >> >> Mich >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 7 Aug 2021 at 20:45, Kartik Ohri <kartikohr...@gmail.com> wrote: >> >>> Hi Mich! >>> It looks like the issue comes from the BigQuery Connector and not Spark >>> itself. For reference, see >>> https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/256 >>> and >>> https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/350. >>> These issues also mention a few possible solutions. >>> >>> Regards >>> Kartik >>> >>> On Sun, Aug 8, 2021 at 1:02 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Thanks for the hint. >>>> >>>> The issue is that it can write data to the BigQuery table as I can see >>>> it in the BigQuery dashboard. >>>> >>>> SELECT * FROM test.randomData WHERE DATE(op_time) = "2021-08-07" LIMIT >>>> 1 >>>> >>>> [ >>>> { >>>> "ID": "43", >>>> "CLUSTERED": "0.42", >>>> "SCATTERED": "42.0", >>>> "RANDOMISED": "4.0", >>>> "RANDOM_STRING": "sSWjDxQSYgDMFMbGBKxNJWBzldghyjgkGFiXyMGrKlyedLdldk", >>>> "SMALL_VC": " 43", >>>> "PADDING": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", >>>> "op_type": "1", >>>> "op_time": "2021-08-07 17:55:14.807984 UTC" >>>> } >>>> ] >>>> >>>> >>>> It can do a count of rows from that table reading the table into a DF. >>>> The code snippet is as follows: >>>> >>>> def loadIntoBQTable(self, df2): >>>> # write to BigQuery table >>>> dataset = "test" >>>> tableName = "randomData" >>>> fullyQualifiedTableName = dataset+'.'+tableName >>>> print(f"""\n writing to BigQuery table >>>> {fullyQualifiedTableName}""") >>>> s.writeTableToBQ(df2,"append",dataset,tableName) ## this works >>>> no error >>>> print(f"""\n Populated BigQuery table >>>> {fullyQualifiedTableName}""") >>>> print("\n rows written is ", df2.count()) >>>> print(f"""\n Reading from BigQuery table >>>> {fullyQualifiedTableName}\n""") >>>> # read data to ensure all loaded OK >>>> read_df = s.loadTableFromBQ(self.spark, dataset, tableName) ## >>>> This works no error >>>> print("\n rows read in is ", read_df.count()) ## This returns >>>> correct number of rows >>>> *read_df.select("ID").show(20,False) ## ! This fails* >>>> >>>> And this is the output and error >>>> >>>> writing to BigQuery table test.randomData >>>> Populated BigQuery table test.randomData >>>> rows written is 100 >>>> >>>> Reading from BigQuery table test.randomData >>>> rows read in is 1000 >>>> >>>> 21/08/07 19:14:44 WARN org.apache.spark.scheduler.TaskSetManager: Lost >>>> task 0.0 in stage 7.0 (TID 10) (10.64.1.38 executor 2): >>>> java.lang.UnsupportedOperationException: sun.misc.Unsafe or >>>> java.nio.DirectByteBuffer.<init>(long, int) not available >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:88) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:692) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68) >>>> at >>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106) >>>> at >>>> com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116) >>>> at >>>> com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66) >>>> at >>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) >>>> at >>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488) >>>> at >>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) >>>> at >>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown >>>> Source) >>>> at >>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >>>> at >>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) >>>> at >>>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) >>>> at >>>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) >>>> at >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:131) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) >>>> at >>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) >>>> at >>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >>>> at >>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>> Source) >>>> at java.base/java.lang.Thread.run(Unknown Source) >>>> >>>> So it cannot read the data written >>>> >>>> Thanks >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Sat, 7 Aug 2021 at 20:04, Artemis User <arte...@dtechspace.com> >>>> wrote: >>>> >>>>> Without seeing the code and the whole stack trace, just a wild guess >>>>> if you set the config param for enabling arrow >>>>> (spark.sql.execution.arrow.pyspark.enabled)? If not in your code, you >>>>> would have to set it in the spark-default.conf. Please note that the >>>>> parameter spark.sql.execution.arrow.enabled is deprecated since Spark >>>>> 3.0... >>>>> >>>>> -- ND >>>>> >>>>> On 8/7/21 2:08 PM, Mich Talebzadeh wrote: >>>>> >>>>> >>>>> Hi, >>>>> >>>>> >>>>> I encounter the error: >>>>> >>>>> >>>>> "java.lang.UnsupportedOperationException: sun.misc.Unsafe or >>>>> java.nio.DirectByteBuffer.<init>(long, int) not available" >>>>> >>>>> >>>>> When reading from Google BigQuery (GBQ) table using Kubernetes cluster >>>>> built on debian buster >>>>> >>>>> >>>>> The current debian bustere from the docker image is: >>>>> >>>>> root@ccf3ac45d0ed:/opt/spark/work-dir# cat /etc/*-release >>>>> >>>>> PRETTY_NAME="Debian GNU/Linux 10 (buster)" >>>>> >>>>> >>>>> And the Java version is >>>>> >>>>> >>>>> echo $JAVA_HOME >>>>> >>>>> /usr/local/openjdk-11 >>>>> >>>>> >>>>> Now according to Spark 3.1.2 doc >>>>> <https://spark.apache.org/docs/latest/> >>>>> >>>>> >>>>> "*For Java 11*, -Dio.netty.tryReflectionSetAccessible=true is >>>>> required additionally for Apache Arrow library. This prevents >>>>> *java.lang.UnsupportedOperationException: >>>>> sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available* >>>>> when >>>>> Apache Arrow uses Netty internally. >>>>> >>>>> >>>>> So I have used it as follows: >>>>> >>>>> spark-submit --verbose \ >>>>> >>>>> --properties-file ${property_file} \ >>>>> >>>>> --master k8s://https://$KUBERNETES_MASTER_IP:443 \ >>>>> >>>>> --deploy-mode cluster \ >>>>> >>>>> --name pytest \ >>>>> >>>>> --conf >>>>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \ >>>>> >>>>> --py-files $CODE_DIRECTORY/DSBQ.zip \ >>>>> >>>>> --conf spark.kubernetes.namespace=$NAMESPACE \ >>>>> >>>>> --conf spark.executor.memory=5000m \ >>>>> >>>>> --conf spark.network.timeout=300 \ >>>>> >>>>> --conf spark.executor.instances=2 \ >>>>> >>>>> --conf spark.kubernetes.driver.limit.cores=1 \ >>>>> >>>>> --conf spark.driver.cores=1 \ >>>>> >>>>> --conf spark.executor.cores=1 \ >>>>> >>>>> --conf spark.executor.memory=2000m \ >>>>> >>>>> --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \ >>>>> >>>>> --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \ >>>>> >>>>> --conf spark.kubernetes.container.image=${IMAGEGCP} \ >>>>> >>>>> --conf >>>>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ >>>>> >>>>> --conf >>>>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >>>>> \ >>>>> >>>>> --conf >>>>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >>>>> \ >>>>> >>>>> $CODE_DIRECTORY/${APPLICATION} >>>>> >>>>> However, some comments mentioned >>>>> <https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta> >>>>> that these parameters need to be supplied before spark-submit, so I added >>>>> them to $SPARK_HOME/conf/spark-defaults.conf >>>>> >>>>> >>>>> 185@b272bbf663e6:/opt/spark/conf$ cat spark-defaults.conf >>>>> >>>>> >>>>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >>>>> >>>>> >>>>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >>>>> >>>>> But I'm still getting the same error! >>>>> >>>>> >>>>> Any ideas will be appreciated. >>>>> >>>>> >>>>> Mich >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>>