Thanks Kartik,

Yes indeed this is a BigQuery issue with Spark. Those two setting (below)
did not work in spark-submit or adding to
$SUBASE_HOME/conf/spark-defaults.conf

 --conf
spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \

 --conf
spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
\


I think the prime reason is the issue with JAVA 11 as on prem with local
mode which uses Spark 3.1.1 with JAVA 8 It has no problem reading from
BigQuery table


writing to BigQuery table test.randomData

 Populated BigQuery table test.randomData

 rows written is  100


 Reading from BigQuery table test.randomData

 rows read in is  1400

+---+

|ID |

+---+

|43 |

|57 |

|2  |

|49 |

|32 |

|46 |

|66 |

|67 |

|72 |

|89 |

|91 |

|55 |

|14 |

|28 |

|70 |

|1  |

|22 |

|54 |

|79 |

|82 |

+---+

only showing top 20 rows

The Java version on prem is

echo $JAVA_HOME
/opt/jdk1.8.0_201

And inside the docker image used in k8s it is
echo $JAVA_HOME

/usr/local/openjdk-11

The only option seems to be adding JAVA 8 to docker image but the latest
Debian buster does not support it!


A suggestion in one the links you sent is


"Unless you are using Java 11 specific APIs, you can compile with Java 8
and run it on Java 11. We haven't adapted the connector to be compiled with
Java 11 (but it's on the roadmap). Another option - bring the connector in
runtime by adding it to the spark.jars or spark.jars.packages."


 Cheers,


Mich

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 7 Aug 2021 at 20:45, Kartik Ohri <kartikohr...@gmail.com> wrote:

> Hi Mich!
> It looks like the issue comes from the BigQuery Connector and not Spark
> itself. For reference, see
> https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/256
> and
> https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/350.
> These issues also mention a few possible solutions.
>
> Regards
> Kartik
>
> On Sun, Aug 8, 2021 at 1:02 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Thanks for the hint.
>>
>> The issue is that it can write data to the BigQuery table as I can see it
>> in the BigQuery dashboard.
>>
>> SELECT * FROM test.randomData WHERE DATE(op_time) = "2021-08-07" LIMIT 1
>>
>> [
>>   {
>>     "ID": "43",
>>     "CLUSTERED": "0.42",
>>     "SCATTERED": "42.0",
>>     "RANDOMISED": "4.0",
>>     "RANDOM_STRING": "sSWjDxQSYgDMFMbGBKxNJWBzldghyjgkGFiXyMGrKlyedLdldk",
>>     "SMALL_VC": "      43",
>>     "PADDING": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
>>     "op_type": "1",
>>     "op_time": "2021-08-07 17:55:14.807984 UTC"
>>   }
>> ]
>>
>>
>> It can do a count of rows from that table reading the table into a DF.
>> The code snippet is as follows:
>>
>>    def loadIntoBQTable(self, df2):
>>         # write to BigQuery table
>>         dataset = "test"
>>         tableName = "randomData"
>>         fullyQualifiedTableName = dataset+'.'+tableName
>>         print(f"""\n writing to BigQuery table
>> {fullyQualifiedTableName}""")
>>         s.writeTableToBQ(df2,"append",dataset,tableName)  ## this works
>> no error
>>         print(f"""\n Populated BigQuery table
>> {fullyQualifiedTableName}""")
>>         print("\n rows written is ",  df2.count())
>>         print(f"""\n Reading from BigQuery table
>> {fullyQualifiedTableName}\n""")
>>         # read data to ensure all loaded OK
>>         read_df = s.loadTableFromBQ(self.spark, dataset, tableName)  ##
>> This works no error
>>         print("\n rows read in is ",  read_df.count())  ## This returns
>> correct number of rows
>>         *read_df.select("ID").show(20,False)  ## ! This fails*
>>
>> And this is the output and error
>>
>>  writing to BigQuery table test.randomData
>>  Populated BigQuery table test.randomData
>>  rows written is  100
>>
>>  Reading from BigQuery table test.randomData
>>  rows read in is  1000
>>
>> 21/08/07 19:14:44 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 7.0 (TID 10) (10.64.1.38 executor 2):
>> java.lang.UnsupportedOperationException: sun.misc.Unsafe or
>> java.nio.DirectByteBuffer.<init>(long, int) not available
>>         at
>> com.google.cloud.spark.bigquery.repackaged.io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:88)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:692)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
>>         at
>> com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106)
>>         at
>> com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
>>         at
>> com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
>>         at
>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>         at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>>         at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
>>         at
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
>>         at
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>>         at
>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>>         at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>         at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>         at java.base/java.lang.Thread.run(Unknown Source)
>>
>> So it cannot read the data written
>>
>> Thanks
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 7 Aug 2021 at 20:04, Artemis User <arte...@dtechspace.com> wrote:
>>
>>> Without seeing the code and the whole stack trace, just a wild guess if
>>> you set the config param for enabling arrow
>>> (spark.sql.execution.arrow.pyspark.enabled)?  If not in your code, you
>>> would have to set it in the spark-default.conf.   Please note that the
>>> parameter spark.sql.execution.arrow.enabled is deprecated since Spark 3.0...
>>>
>>> -- ND
>>>
>>> On 8/7/21 2:08 PM, Mich Talebzadeh wrote:
>>>
>>>
>>> Hi,
>>>
>>>
>>>  I encounter the error:
>>>
>>>
>>> "java.lang.UnsupportedOperationException: sun.misc.Unsafe or
>>> java.nio.DirectByteBuffer.<init>(long, int) not available"
>>>
>>>
>>> When reading from Google BigQuery (GBQ) table using Kubernetes cluster
>>> built on debian buster
>>>
>>>
>>> The current debian bustere from the docker image is:
>>>
>>> root@ccf3ac45d0ed:/opt/spark/work-dir# cat /etc/*-release
>>>
>>> PRETTY_NAME="Debian GNU/Linux 10 (buster)"
>>>
>>>
>>> And the Java version is
>>>
>>>
>>> echo $JAVA_HOME
>>>
>>> /usr/local/openjdk-11
>>>
>>>
>>> Now according to Spark 3.1.2 doc <https://spark.apache.org/docs/latest/>
>>>
>>>
>>> "*For Java 11*, -Dio.netty.tryReflectionSetAccessible=true is required
>>> additionally for Apache Arrow library. This prevents 
>>> *java.lang.UnsupportedOperationException:
>>> sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available* when
>>> Apache Arrow uses Netty internally.
>>>
>>>
>>> So I have used it as follows:
>>>
>>>         spark-submit --verbose \
>>>
>>>            --properties-file ${property_file} \
>>>
>>>            --master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>>
>>>            --deploy-mode cluster \
>>>
>>>            --name pytest \
>>>
>>>            --conf
>>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>>>
>>>            --py-files $CODE_DIRECTORY/DSBQ.zip \
>>>
>>>            --conf spark.kubernetes.namespace=$NAMESPACE \
>>>
>>>            --conf spark.executor.memory=5000m \
>>>
>>>            --conf spark.network.timeout=300 \
>>>
>>>            --conf spark.executor.instances=2 \
>>>
>>>            --conf spark.kubernetes.driver.limit.cores=1 \
>>>
>>>            --conf spark.driver.cores=1 \
>>>
>>>            --conf spark.executor.cores=1 \
>>>
>>>            --conf spark.executor.memory=2000m \
>>>
>>>            --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>>
>>>            --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>>
>>>            --conf spark.kubernetes.container.image=${IMAGEGCP} \
>>>
>>>            --conf
>>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>>>
>>>           --conf
>>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>>>
>>>            --conf
>>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>> \
>>>
>>>            $CODE_DIRECTORY/${APPLICATION}
>>>
>>> However, some comments mentioned
>>> <https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta>
>>> that these parameters need to be supplied before spark-submit, so I added
>>> them to $SPARK_HOME/conf/spark-defaults.conf
>>>
>>>
>>> 185@b272bbf663e6:/opt/spark/conf$ cat spark-defaults.conf
>>>
>>>
>>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>>
>>>
>>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>>>
>>> But I'm still getting the same error!
>>>
>>>
>>> Any ideas will be appreciated.
>>>
>>>
>>> Mich
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>>

Reply via email to