Hi Mich!
It looks like the issue comes from the BigQuery Connector and not Spark
itself. For reference, see
https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/256
and
https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/350.
These issues also mention a few possible solutions.

Regards
Kartik

On Sun, Aug 8, 2021 at 1:02 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thanks for the hint.
>
> The issue is that it can write data to the BigQuery table as I can see it
> in the BigQuery dashboard.
>
> SELECT * FROM test.randomData WHERE DATE(op_time) = "2021-08-07" LIMIT 1
>
> [
>   {
>     "ID": "43",
>     "CLUSTERED": "0.42",
>     "SCATTERED": "42.0",
>     "RANDOMISED": "4.0",
>     "RANDOM_STRING": "sSWjDxQSYgDMFMbGBKxNJWBzldghyjgkGFiXyMGrKlyedLdldk",
>     "SMALL_VC": "      43",
>     "PADDING": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
>     "op_type": "1",
>     "op_time": "2021-08-07 17:55:14.807984 UTC"
>   }
> ]
>
>
> It can do a count of rows from that table reading the table into a DF. The
> code snippet is as follows:
>
>    def loadIntoBQTable(self, df2):
>         # write to BigQuery table
>         dataset = "test"
>         tableName = "randomData"
>         fullyQualifiedTableName = dataset+'.'+tableName
>         print(f"""\n writing to BigQuery table
> {fullyQualifiedTableName}""")
>         s.writeTableToBQ(df2,"append",dataset,tableName)  ## this works no
> error
>         print(f"""\n Populated BigQuery table {fullyQualifiedTableName}""")
>         print("\n rows written is ",  df2.count())
>         print(f"""\n Reading from BigQuery table
> {fullyQualifiedTableName}\n""")
>         # read data to ensure all loaded OK
>         read_df = s.loadTableFromBQ(self.spark, dataset, tableName)  ##
> This works no error
>         print("\n rows read in is ",  read_df.count())  ## This returns
> correct number of rows
>         *read_df.select("ID").show(20,False)  ## ! This fails*
>
> And this is the output and error
>
>  writing to BigQuery table test.randomData
>  Populated BigQuery table test.randomData
>  rows written is  100
>
>  Reading from BigQuery table test.randomData
>  rows read in is  1000
>
> 21/08/07 19:14:44 WARN org.apache.spark.scheduler.TaskSetManager: Lost
> task 0.0 in stage 7.0 (TID 10) (10.64.1.38 executor 2):
> java.lang.UnsupportedOperationException: sun.misc.Unsafe or
> java.nio.DirectByteBuffer.<init>(long, int) not available
>         at
> com.google.cloud.spark.bigquery.repackaged.io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
>         at
> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257)
>         at
> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247)
>         at
> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248)
>         at
> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:88)
>         at
> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:692)
>         at
> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
>         at
> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106)
>         at
> com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
>         at
> com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
>         at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
>         at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>         at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
>         at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
>         at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>         at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>         at java.base/java.lang.Thread.run(Unknown Source)
>
> So it cannot read the data written
>
> Thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 7 Aug 2021 at 20:04, Artemis User <arte...@dtechspace.com> wrote:
>
>> Without seeing the code and the whole stack trace, just a wild guess if
>> you set the config param for enabling arrow
>> (spark.sql.execution.arrow.pyspark.enabled)?  If not in your code, you
>> would have to set it in the spark-default.conf.   Please note that the
>> parameter spark.sql.execution.arrow.enabled is deprecated since Spark 3.0...
>>
>> -- ND
>>
>> On 8/7/21 2:08 PM, Mich Talebzadeh wrote:
>>
>>
>> Hi,
>>
>>
>>  I encounter the error:
>>
>>
>> "java.lang.UnsupportedOperationException: sun.misc.Unsafe or
>> java.nio.DirectByteBuffer.<init>(long, int) not available"
>>
>>
>> When reading from Google BigQuery (GBQ) table using Kubernetes cluster
>> built on debian buster
>>
>>
>> The current debian bustere from the docker image is:
>>
>> root@ccf3ac45d0ed:/opt/spark/work-dir# cat /etc/*-release
>>
>> PRETTY_NAME="Debian GNU/Linux 10 (buster)"
>>
>>
>> And the Java version is
>>
>>
>> echo $JAVA_HOME
>>
>> /usr/local/openjdk-11
>>
>>
>> Now according to Spark 3.1.2 doc <https://spark.apache.org/docs/latest/>
>>
>>
>> "*For Java 11*, -Dio.netty.tryReflectionSetAccessible=true is required
>> additionally for Apache Arrow library. This prevents 
>> *java.lang.UnsupportedOperationException:
>> sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available* when
>> Apache Arrow uses Netty internally.
>>
>>
>> So I have used it as follows:
>>
>>         spark-submit --verbose \
>>
>>            --properties-file ${property_file} \
>>
>>            --master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>
>>            --deploy-mode cluster \
>>
>>            --name pytest \
>>
>>            --conf
>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>>
>>            --py-files $CODE_DIRECTORY/DSBQ.zip \
>>
>>            --conf spark.kubernetes.namespace=$NAMESPACE \
>>
>>            --conf spark.executor.memory=5000m \
>>
>>            --conf spark.network.timeout=300 \
>>
>>            --conf spark.executor.instances=2 \
>>
>>            --conf spark.kubernetes.driver.limit.cores=1 \
>>
>>            --conf spark.driver.cores=1 \
>>
>>            --conf spark.executor.cores=1 \
>>
>>            --conf spark.executor.memory=2000m \
>>
>>            --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>
>>            --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>
>>            --conf spark.kubernetes.container.image=${IMAGEGCP} \
>>
>>            --conf
>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>>
>>           --conf
>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>>
>>            --conf
>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>> \
>>
>>            $CODE_DIRECTORY/${APPLICATION}
>>
>> However, some comments mentioned
>> <https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta>
>> that these parameters need to be supplied before spark-submit, so I added
>> them to $SPARK_HOME/conf/spark-defaults.conf
>>
>>
>> 185@b272bbf663e6:/opt/spark/conf$ cat spark-defaults.conf
>>
>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>
>>
>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>
>> But I'm still getting the same error!
>>
>>
>> Any ideas will be appreciated.
>>
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>

Reply via email to