Further on this to avoid the issue with Java 11 I went back and installed
Java 8 on the docker image.  This was simply done by  cp -R
/opt/jdk1.8.0_201 to $SPARK_HOME and amending the docker image to copy it
across to /opt inside the image and exporting JAVA HOME as below

RUN mkdir -p /opt/jdk1.8.0_201
COPY jdk1.8.0_201 /opt/jdk1.8.0_201
ENV JAVA_HOME=/opt/jdk1.8.0_201
ENV JRE_HOME=/opt/jdk1.8.0_201/jre
RUN export JAVA_HOME

Now the image is built on JAVA 8

 docker run -it cfbb0e69f204 bash
185@afd549bb7050:/opt/spark/work-dir$ echo $JAVA_HOME
/opt/jdk1.8.0_201

And I tested this image on minikube on premise and it worked fine reading
and writing to BigQuery as expected.

Having pushed this image to Google cloud, I am facing some authorisation
issue that stops accessing BigQuery through GKE

 writing to BigQuery table test.randomData
An error occurred while calling o113.save.
: java.io.UncheckedIOException: Failed to create Credentials from file
        at
com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.connector.common.BigQueryCredentialsSupplier.createCredentialsFromFile(BigQueryCredentialsSupplier.java:63)

Caused by: java.io.FileNotFoundException:
<PTOJECT_ID>-224522-1a8ab25e740e.json (No such file or directory)

        at java.io.FileInputStream.open0(Native Method)

        at java.io.FileInputStream.open(FileInputStream.java:195)

        at java.io.FileInputStream.<init>(FileInputStream.java:138)

I don't know why it is looking for that file (it is actually there). It is
supposed to use secrets!



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 7 Aug 2021 at 21:49, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thanks Kartik,
>
> Yes indeed this is a BigQuery issue with Spark. Those two setting (below)
> did not work in spark-submit or adding to
> $SUBASE_HOME/conf/spark-defaults.conf
>
>  --conf
> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>
>  --conf
> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
> \
>
>
> I think the prime reason is the issue with JAVA 11 as on prem with local
> mode which uses Spark 3.1.1 with JAVA 8 It has no problem reading from
> BigQuery table
>
>
> writing to BigQuery table test.randomData
>
>  Populated BigQuery table test.randomData
>
>  rows written is  100
>
>
>  Reading from BigQuery table test.randomData
>
>  rows read in is  1400
>
> +---+
>
> |ID |
>
> +---+
>
> |43 |
>
> |57 |
>
> |2  |
>
> |49 |
>
> |32 |
>
> |46 |
>
> |66 |
>
> |67 |
>
> |72 |
>
> |89 |
>
> |91 |
>
> |55 |
>
> |14 |
>
> |28 |
>
> |70 |
>
> |1  |
>
> |22 |
>
> |54 |
>
> |79 |
>
> |82 |
>
> +---+
>
> only showing top 20 rows
>
> The Java version on prem is
>
> echo $JAVA_HOME
> /opt/jdk1.8.0_201
>
> And inside the docker image used in k8s it is
> echo $JAVA_HOME
>
> /usr/local/openjdk-11
>
> The only option seems to be adding JAVA 8 to docker image but the latest
> Debian buster does not support it!
>
>
> A suggestion in one the links you sent is
>
>
> "Unless you are using Java 11 specific APIs, you can compile with Java 8
> and run it on Java 11. We haven't adapted the connector to be compiled with
> Java 11 (but it's on the roadmap). Another option - bring the connector in
> runtime by adding it to the spark.jars or spark.jars.packages."
>
>
>  Cheers,
>
>
> Mich
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 7 Aug 2021 at 20:45, Kartik Ohri <kartikohr...@gmail.com> wrote:
>
>> Hi Mich!
>> It looks like the issue comes from the BigQuery Connector and not Spark
>> itself. For reference, see
>> https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/256
>> and
>> https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/350.
>> These issues also mention a few possible solutions.
>>
>> Regards
>> Kartik
>>
>> On Sun, Aug 8, 2021 at 1:02 AM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Thanks for the hint.
>>>
>>> The issue is that it can write data to the BigQuery table as I can see
>>> it in the BigQuery dashboard.
>>>
>>> SELECT * FROM test.randomData WHERE DATE(op_time) = "2021-08-07" LIMIT 1
>>>
>>> [
>>>   {
>>>     "ID": "43",
>>>     "CLUSTERED": "0.42",
>>>     "SCATTERED": "42.0",
>>>     "RANDOMISED": "4.0",
>>>     "RANDOM_STRING": "sSWjDxQSYgDMFMbGBKxNJWBzldghyjgkGFiXyMGrKlyedLdldk",
>>>     "SMALL_VC": "      43",
>>>     "PADDING": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
>>>     "op_type": "1",
>>>     "op_time": "2021-08-07 17:55:14.807984 UTC"
>>>   }
>>> ]
>>>
>>>
>>> It can do a count of rows from that table reading the table into a DF.
>>> The code snippet is as follows:
>>>
>>>    def loadIntoBQTable(self, df2):
>>>         # write to BigQuery table
>>>         dataset = "test"
>>>         tableName = "randomData"
>>>         fullyQualifiedTableName = dataset+'.'+tableName
>>>         print(f"""\n writing to BigQuery table
>>> {fullyQualifiedTableName}""")
>>>         s.writeTableToBQ(df2,"append",dataset,tableName)  ## this works
>>> no error
>>>         print(f"""\n Populated BigQuery table
>>> {fullyQualifiedTableName}""")
>>>         print("\n rows written is ",  df2.count())
>>>         print(f"""\n Reading from BigQuery table
>>> {fullyQualifiedTableName}\n""")
>>>         # read data to ensure all loaded OK
>>>         read_df = s.loadTableFromBQ(self.spark, dataset, tableName)  ##
>>> This works no error
>>>         print("\n rows read in is ",  read_df.count())  ## This returns
>>> correct number of rows
>>>         *read_df.select("ID").show(20,False)  ## ! This fails*
>>>
>>> And this is the output and error
>>>
>>>  writing to BigQuery table test.randomData
>>>  Populated BigQuery table test.randomData
>>>  rows written is  100
>>>
>>>  Reading from BigQuery table test.randomData
>>>  rows read in is  1000
>>>
>>> 21/08/07 19:14:44 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>>> task 0.0 in stage 7.0 (TID 10) (10.64.1.38 executor 2):
>>> java.lang.UnsupportedOperationException: sun.misc.Unsafe or
>>> java.nio.DirectByteBuffer.<init>(long, int) not available
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:88)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:692)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
>>>         at
>>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106)
>>>         at
>>> com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
>>>         at
>>> com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
>>>         at
>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
>>>         at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>>         at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>>> Source)
>>>         at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>>         at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
>>>         at
>>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
>>>         at
>>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>>>         at
>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>>>         at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>>         at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>         at java.base/java.lang.Thread.run(Unknown Source)
>>>
>>> So it cannot read the data written
>>>
>>> Thanks
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 7 Aug 2021 at 20:04, Artemis User <arte...@dtechspace.com>
>>> wrote:
>>>
>>>> Without seeing the code and the whole stack trace, just a wild guess if
>>>> you set the config param for enabling arrow
>>>> (spark.sql.execution.arrow.pyspark.enabled)?  If not in your code, you
>>>> would have to set it in the spark-default.conf.   Please note that the
>>>> parameter spark.sql.execution.arrow.enabled is deprecated since Spark 
>>>> 3.0...
>>>>
>>>> -- ND
>>>>
>>>> On 8/7/21 2:08 PM, Mich Talebzadeh wrote:
>>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>  I encounter the error:
>>>>
>>>>
>>>> "java.lang.UnsupportedOperationException: sun.misc.Unsafe or
>>>> java.nio.DirectByteBuffer.<init>(long, int) not available"
>>>>
>>>>
>>>> When reading from Google BigQuery (GBQ) table using Kubernetes cluster
>>>> built on debian buster
>>>>
>>>>
>>>> The current debian bustere from the docker image is:
>>>>
>>>> root@ccf3ac45d0ed:/opt/spark/work-dir# cat /etc/*-release
>>>>
>>>> PRETTY_NAME="Debian GNU/Linux 10 (buster)"
>>>>
>>>>
>>>> And the Java version is
>>>>
>>>>
>>>> echo $JAVA_HOME
>>>>
>>>> /usr/local/openjdk-11
>>>>
>>>>
>>>> Now according to Spark 3.1.2 doc
>>>> <https://spark.apache.org/docs/latest/>
>>>>
>>>>
>>>> "*For Java 11*, -Dio.netty.tryReflectionSetAccessible=true is required
>>>> additionally for Apache Arrow library. This prevents 
>>>> *java.lang.UnsupportedOperationException:
>>>> sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available* 
>>>> when
>>>> Apache Arrow uses Netty internally.
>>>>
>>>>
>>>> So I have used it as follows:
>>>>
>>>>         spark-submit --verbose \
>>>>
>>>>            --properties-file ${property_file} \
>>>>
>>>>            --master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>>>
>>>>            --deploy-mode cluster \
>>>>
>>>>            --name pytest \
>>>>
>>>>            --conf
>>>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>>>>
>>>>            --py-files $CODE_DIRECTORY/DSBQ.zip \
>>>>
>>>>            --conf spark.kubernetes.namespace=$NAMESPACE \
>>>>
>>>>            --conf spark.executor.memory=5000m \
>>>>
>>>>            --conf spark.network.timeout=300 \
>>>>
>>>>            --conf spark.executor.instances=2 \
>>>>
>>>>            --conf spark.kubernetes.driver.limit.cores=1 \
>>>>
>>>>            --conf spark.driver.cores=1 \
>>>>
>>>>            --conf spark.executor.cores=1 \
>>>>
>>>>            --conf spark.executor.memory=2000m \
>>>>
>>>>            --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>>>
>>>>            --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>>>
>>>>            --conf spark.kubernetes.container.image=${IMAGEGCP} \
>>>>
>>>>            --conf
>>>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>>>>
>>>>           --conf
>>>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" 
>>>> \
>>>>
>>>>            --conf
>>>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>>> \
>>>>
>>>>            $CODE_DIRECTORY/${APPLICATION}
>>>>
>>>> However, some comments mentioned
>>>> <https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta>
>>>> that these parameters need to be supplied before spark-submit, so I added
>>>> them to $SPARK_HOME/conf/spark-defaults.conf
>>>>
>>>>
>>>> 185@b272bbf663e6:/opt/spark/conf$ cat spark-defaults.conf
>>>>
>>>>
>>>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>>>
>>>>
>>>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>>>
>>>> But I'm still getting the same error!
>>>>
>>>>
>>>> Any ideas will be appreciated.
>>>>
>>>>
>>>> Mich
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to