Thanks for the hint.

The issue is that it can write data to the BigQuery table as I can see it
in the BigQuery dashboard.

SELECT * FROM test.randomData WHERE DATE(op_time) = "2021-08-07" LIMIT 1

[
  {
    "ID": "43",
    "CLUSTERED": "0.42",
    "SCATTERED": "42.0",
    "RANDOMISED": "4.0",
    "RANDOM_STRING": "sSWjDxQSYgDMFMbGBKxNJWBzldghyjgkGFiXyMGrKlyedLdldk",
    "SMALL_VC": "      43",
    "PADDING": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "op_type": "1",
    "op_time": "2021-08-07 17:55:14.807984 UTC"
  }
]


It can do a count of rows from that table reading the table into a DF. The
code snippet is as follows:

   def loadIntoBQTable(self, df2):
        # write to BigQuery table
        dataset = "test"
        tableName = "randomData"
        fullyQualifiedTableName = dataset+'.'+tableName
        print(f"""\n writing to BigQuery table {fullyQualifiedTableName}""")
        s.writeTableToBQ(df2,"append",dataset,tableName)  ## this works no
error
        print(f"""\n Populated BigQuery table {fullyQualifiedTableName}""")
        print("\n rows written is ",  df2.count())
        print(f"""\n Reading from BigQuery table
{fullyQualifiedTableName}\n""")
        # read data to ensure all loaded OK
        read_df = s.loadTableFromBQ(self.spark, dataset, tableName)  ##
This works no error
        print("\n rows read in is ",  read_df.count())  ## This returns
correct number of rows
        *read_df.select("ID").show(20,False)  ## ! This fails*

And this is the output and error

 writing to BigQuery table test.randomData
 Populated BigQuery table test.randomData
 rows written is  100

 Reading from BigQuery table test.randomData
 rows read in is  1000

21/08/07 19:14:44 WARN org.apache.spark.scheduler.TaskSetManager: Lost task
0.0 in stage 7.0 (TID 10) (10.64.1.38 executor 2):
java.lang.UnsupportedOperationException: sun.misc.Unsafe or
java.nio.DirectByteBuffer.<init>(long, int) not available
        at
com.google.cloud.spark.bigquery.repackaged.io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
        at
com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257)
        at
com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247)
        at
com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248)
        at
com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:88)
        at
com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:692)
        at
com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
        at
com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106)
        at
com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
        at
com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
        at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

So it cannot read the data written

Thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 7 Aug 2021 at 20:04, Artemis User <arte...@dtechspace.com> wrote:

> Without seeing the code and the whole stack trace, just a wild guess if
> you set the config param for enabling arrow
> (spark.sql.execution.arrow.pyspark.enabled)?  If not in your code, you
> would have to set it in the spark-default.conf.   Please note that the
> parameter spark.sql.execution.arrow.enabled is deprecated since Spark 3.0...
>
> -- ND
>
> On 8/7/21 2:08 PM, Mich Talebzadeh wrote:
>
>
> Hi,
>
>
>  I encounter the error:
>
>
> "java.lang.UnsupportedOperationException: sun.misc.Unsafe or
> java.nio.DirectByteBuffer.<init>(long, int) not available"
>
>
> When reading from Google BigQuery (GBQ) table using Kubernetes cluster
> built on debian buster
>
>
> The current debian bustere from the docker image is:
>
> root@ccf3ac45d0ed:/opt/spark/work-dir# cat /etc/*-release
>
> PRETTY_NAME="Debian GNU/Linux 10 (buster)"
>
>
> And the Java version is
>
>
> echo $JAVA_HOME
>
> /usr/local/openjdk-11
>
>
> Now according to Spark 3.1.2 doc <https://spark.apache.org/docs/latest/>
>
>
> "*For Java 11*, -Dio.netty.tryReflectionSetAccessible=true is required
> additionally for Apache Arrow library. This prevents 
> *java.lang.UnsupportedOperationException:
> sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available* when
> Apache Arrow uses Netty internally.
>
>
> So I have used it as follows:
>
>         spark-submit --verbose \
>
>            --properties-file ${property_file} \
>
>            --master k8s://https://$KUBERNETES_MASTER_IP:443 \
>
>            --deploy-mode cluster \
>
>            --name pytest \
>
>            --conf
> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>
>            --py-files $CODE_DIRECTORY/DSBQ.zip \
>
>            --conf spark.kubernetes.namespace=$NAMESPACE \
>
>            --conf spark.executor.memory=5000m \
>
>            --conf spark.network.timeout=300 \
>
>            --conf spark.executor.instances=2 \
>
>            --conf spark.kubernetes.driver.limit.cores=1 \
>
>            --conf spark.driver.cores=1 \
>
>            --conf spark.executor.cores=1 \
>
>            --conf spark.executor.memory=2000m \
>
>            --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>
>            --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>
>            --conf spark.kubernetes.container.image=${IMAGEGCP} \
>
>            --conf
> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>
>           --conf
> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>
>            --conf
> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
> \
>
>            $CODE_DIRECTORY/${APPLICATION}
>
> However, some comments mentioned
> <https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta>
> that these parameters need to be supplied before spark-submit, so I added
> them to $SPARK_HOME/conf/spark-defaults.conf
>
>
> 185@b272bbf663e6:/opt/spark/conf$ cat spark-defaults.conf
>
> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>
>
> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>
> But I'm still getting the same error!
>
>
> Any ideas will be appreciated.
>
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>

Reply via email to