Hi Mich! It looks like the issue comes from the BigQuery Connector and not Spark itself. For reference, see https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/256 and https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/350. These issues also mention a few possible solutions.
Regards Kartik On Sun, Aug 8, 2021 at 1:02 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thanks for the hint. > > The issue is that it can write data to the BigQuery table as I can see it > in the BigQuery dashboard. > > SELECT * FROM test.randomData WHERE DATE(op_time) = "2021-08-07" LIMIT 1 > > [ > { > "ID": "43", > "CLUSTERED": "0.42", > "SCATTERED": "42.0", > "RANDOMISED": "4.0", > "RANDOM_STRING": "sSWjDxQSYgDMFMbGBKxNJWBzldghyjgkGFiXyMGrKlyedLdldk", > "SMALL_VC": " 43", > "PADDING": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", > "op_type": "1", > "op_time": "2021-08-07 17:55:14.807984 UTC" > } > ] > > > It can do a count of rows from that table reading the table into a DF. The > code snippet is as follows: > > def loadIntoBQTable(self, df2): > # write to BigQuery table > dataset = "test" > tableName = "randomData" > fullyQualifiedTableName = dataset+'.'+tableName > print(f"""\n writing to BigQuery table > {fullyQualifiedTableName}""") > s.writeTableToBQ(df2,"append",dataset,tableName) ## this works no > error > print(f"""\n Populated BigQuery table {fullyQualifiedTableName}""") > print("\n rows written is ", df2.count()) > print(f"""\n Reading from BigQuery table > {fullyQualifiedTableName}\n""") > # read data to ensure all loaded OK > read_df = s.loadTableFromBQ(self.spark, dataset, tableName) ## > This works no error > print("\n rows read in is ", read_df.count()) ## This returns > correct number of rows > *read_df.select("ID").show(20,False) ## ! This fails* > > And this is the output and error > > writing to BigQuery table test.randomData > Populated BigQuery table test.randomData > rows written is 100 > > Reading from BigQuery table test.randomData > rows read in is 1000 > > 21/08/07 19:14:44 WARN org.apache.spark.scheduler.TaskSetManager: Lost > task 0.0 in stage 7.0 (TID 10) (10.64.1.38 executor 2): > java.lang.UnsupportedOperationException: sun.misc.Unsafe or > java.nio.DirectByteBuffer.<init>(long, int) not available > at > com.google.cloud.spark.bigquery.repackaged.io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490) > at > com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257) > at > com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247) > at > com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248) > at > com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:88) > at > com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:692) > at > com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68) > at > com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106) > at > com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116) > at > com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.base/java.lang.Thread.run(Unknown Source) > > So it cannot read the data written > > Thanks > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 7 Aug 2021 at 20:04, Artemis User <arte...@dtechspace.com> wrote: > >> Without seeing the code and the whole stack trace, just a wild guess if >> you set the config param for enabling arrow >> (spark.sql.execution.arrow.pyspark.enabled)? If not in your code, you >> would have to set it in the spark-default.conf. Please note that the >> parameter spark.sql.execution.arrow.enabled is deprecated since Spark 3.0... >> >> -- ND >> >> On 8/7/21 2:08 PM, Mich Talebzadeh wrote: >> >> >> Hi, >> >> >> I encounter the error: >> >> >> "java.lang.UnsupportedOperationException: sun.misc.Unsafe or >> java.nio.DirectByteBuffer.<init>(long, int) not available" >> >> >> When reading from Google BigQuery (GBQ) table using Kubernetes cluster >> built on debian buster >> >> >> The current debian bustere from the docker image is: >> >> root@ccf3ac45d0ed:/opt/spark/work-dir# cat /etc/*-release >> >> PRETTY_NAME="Debian GNU/Linux 10 (buster)" >> >> >> And the Java version is >> >> >> echo $JAVA_HOME >> >> /usr/local/openjdk-11 >> >> >> Now according to Spark 3.1.2 doc <https://spark.apache.org/docs/latest/> >> >> >> "*For Java 11*, -Dio.netty.tryReflectionSetAccessible=true is required >> additionally for Apache Arrow library. This prevents >> *java.lang.UnsupportedOperationException: >> sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available* when >> Apache Arrow uses Netty internally. >> >> >> So I have used it as follows: >> >> spark-submit --verbose \ >> >> --properties-file ${property_file} \ >> >> --master k8s://https://$KUBERNETES_MASTER_IP:443 \ >> >> --deploy-mode cluster \ >> >> --name pytest \ >> >> --conf >> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \ >> >> --py-files $CODE_DIRECTORY/DSBQ.zip \ >> >> --conf spark.kubernetes.namespace=$NAMESPACE \ >> >> --conf spark.executor.memory=5000m \ >> >> --conf spark.network.timeout=300 \ >> >> --conf spark.executor.instances=2 \ >> >> --conf spark.kubernetes.driver.limit.cores=1 \ >> >> --conf spark.driver.cores=1 \ >> >> --conf spark.executor.cores=1 \ >> >> --conf spark.executor.memory=2000m \ >> >> --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \ >> >> --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \ >> >> --conf spark.kubernetes.container.image=${IMAGEGCP} \ >> >> --conf >> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ >> >> --conf >> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ >> >> --conf >> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> \ >> >> $CODE_DIRECTORY/${APPLICATION} >> >> However, some comments mentioned >> <https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta> >> that these parameters need to be supplied before spark-submit, so I added >> them to $SPARK_HOME/conf/spark-defaults.conf >> >> >> 185@b272bbf663e6:/opt/spark/conf$ cat spark-defaults.conf >> >> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> >> >> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" >> >> But I'm still getting the same error! >> >> >> Any ideas will be appreciated. >> >> >> Mich >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >>