Hi everyone,

I am trying to use spark to process a large cassandra table (~402 million
entries and 84 columns) but I am getting inconsistent results. Initially
the requirement was to copy some columns from this table to another table.
After copying the data, I noticed that some entries in the new table were
missing. To verify that I took count of the large source table but I am
getting different values each time. I tried the queries on a smaller table
(~7 million records) and the results were fine.

Initially, I attempted to take count using pyspark. Here is my pyspark
script:

spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
df = 
spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable,
keyspace=sourcekeyspace).load().cache()
df.createOrReplaceTempView("data")
query = ("select count(1) from data " )
vgDF = spark.sql(query)
vgDF.show(10)

Spark submit command is as follows:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master
spark://10.128.0.18:7077 --packages
datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf
spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3"
--conf "spark.storage.memoryFraction=1" --conf
spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6
--executor-cores=2 --total-executor-cores 18 pyspark_script.py

The above spark submit process takes ~90 minutes to complete. I ran it
three times and here are the counts I got:

Spark iteration 1:  402273852
Spark iteration 2:  402273884
Spark iteration 3:  402274209

Spark does not show any error or exception during the entire process. I ran
the same query in cqlsh thrice and got different results again:

Cqlsh iteration 1:   402273598
Cqlsh iteration 2:   402273499
Cqlsh iteration 3:   402273515

I am unable to find out why I am getting different outcomes from the same
query. Cassandra system logs (*/var/log/cassandra/system.log*) has shown
the following error message just once:

ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592
CassandraDaemon.java:226 - Exception in thread
Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable
/media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
        at 
org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460)
~[apache-cassandra-3.9.jar:3.9]
        at 
org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375)
~[apache-cassandra-3.9.jar:3.9]
        at 
org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536)
~[apache-cassandra-3.9.jar:3.9]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[na:1.8.0_131]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
~[na:1.8.0_131]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

*Versions:*

   - Cassandra 3.9
   - Spark 2.1.0
   - Datastax's spark-cassandra-connector 2.0.1
   - Scala version 2.11

*Cluster:*

   - Spark setup with 3 workers and 1 master node.
   - 3 worker nodes also have a cassandra cluster installed.
   - Each worker node has 8 CPU cores and 40 GB RAM.

Any help will be greatly appreciated.

Thanks,
Faraz

Reply via email to