Update - the answer was spark.cassandra.input.split.sizeInMB. The
default value is 512MBytes. Setting this to 50 resulted in a lot more
splits and the job ran in under 11 minutes; no timeout errors. In this
case the job was a simple count. 10 minutes 48 seconds for over 8.2
billion rows. Fast!
Good times ahead.
-Joe
On 2/8/2022 10:00 AM, Joe Obernberger wrote:
Update - I believe that for large tables, the
spark.cassandra.read.timeoutMS needs to be very long; like 4 hours or
longer. The job now runs much longer, but still doesn't complete.
I'm now facing this all too familiar error:
com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException:
Cassandra timeout during read query at consistency LOCAL_ONE (1
responses were required but only 0 replica responded)
In the past this has been due to clocks being out of sync (not the
issue here), or a table that has been written to with LOCAL_ONE
instead of LOCAL_QUORUM. I don't believe either of those are the
case. To be sure, I ran a repair on the table overnight (about 17
hours to complete). For the next test, I set the
spark.cassandra.connection.timeoutMS to 60000 (default is 5000), and
the spark.cassandra.query.retry.count to -1.
Suggestions? Thoughts?
Thanks all.
-Joe
On 2/7/2022 10:35 AM, Joe Obernberger wrote:
Some more info. Tried different GC strategies - no luck.
It only happens on large tables (more than 1 billion rows). Works
fine on a 300million row table. There is very high CPU usage during
the run.
I've tried setting spark.dse.continuousPagingEnabled to false and
I've tried setting spark.cassandra.input.readsPerSec to 10; no effect.
Stats:
nodetool cfstats doc.doc
Total number of tables: 82
----------------
Keyspace : doc
Read Count: 9620329
Read Latency: 0.5629605546754171 ms
Write Count: 510561482
Write Latency: 0.02805177028806885 ms
Pending Flushes: 0
Table: doc
SSTable count: 77
Old SSTable count: 0
Space used (live): 82061188941
Space used (total): 82061188941
Space used by snapshots (total): 0
Off heap memory used (total): 317037065
SSTable Compression Ratio: 0.3816525125492022
Number of partitions (estimate): 101021793
Memtable cell count: 209646
Memtable data size: 44087966
Memtable off heap memory used: 0
Memtable switch count: 10
Local read count: 25665
Local read latency: NaN ms
Local write count: 2459322
Local write latency: NaN ms
Pending flushes: 0
Percent repaired: 0.0
Bytes repaired: 0.000KiB
Bytes unrepaired: 184.869GiB
Bytes pending repair: 0.000KiB
Bloom filter false positives: 2063
Bloom filter false ratio: 0.01020
Bloom filter space used: 169249016
Bloom filter off heap memory used: 169248400
Index summary off heap memory used: 50863401
Compression metadata off heap memory used: 96925264
Compacted partition minimum bytes: 104
Compacted partition maximum bytes: 943127
Compacted partition mean bytes: 1721
Average live cells per slice (last five minutes): NaN
Maximum live cells per slice (last five minutes): 0
Average tombstones per slice (last five minutes): NaN
Maximum tombstones per slice (last five minutes): 0
Dropped Mutations: 0
nodetool tablehistograms doc.doc
doc/doc histograms
Percentile Read Latency Write Latency SSTables Partition
Size Cell Count
(micros) (micros) (bytes)
50% 0.00 0.00 0.00
1109 86
75% 0.00 0.00 0.00
3311 215
95% 0.00 0.00 0.00
3311 215
98% 0.00 0.00 0.00
3311 215
99% 0.00 0.00 0.00
3311 215
Min 0.00 0.00 0.00
104 5
Max 0.00 0.00 0.00
943127 2299
I'm stuck.
-Joe
On 2/3/2022 9:30 PM, manish khandelwal wrote:
It maybe the case you have lots of tombstones in this table which is
making reads slow and timeouts during bulk reads.
On Fri, Feb 4, 2022, 03:23 Joe Obernberger
<joseph.obernber...@gmail.com> wrote:
So it turns out that number after PT is increments of 60
seconds. I changed the timeout to 960000, and now I get PT16M
(960000/60000). Since I'm still getting timeouts, something
else must be wrong.
Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 306 in stage 0.0 failed 4
times, most recent failure: Lost task 306.3 in stage 0.0 (TID
1180) (172.16.100.39 executor 0):
com.datastax.oss.driver.api.core.DriverTimeoutException: Query
timed out after PT16M
at
com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206)
at
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at
com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by:
com.datastax.oss.driver.api.core.DriverTimeoutException: Query
timed out after PT16M
at
com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206)
at
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at
com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
-Joe
On 2/3/2022 3:30 PM, Joe Obernberger wrote:
I did find this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md
And "spark.cassandra.read.timeoutMS" is set to 120000.
Running a test now, and I think that is it. Thank you Scott.
-Joe
On 2/3/2022 3:19 PM, Joe Obernberger wrote:
Thank you Scott!
I am using the spark cassandra connector. Code:
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "chaos")
.config("spark.cassandra.connection.port", "9042")
.master("spark://aether.querymasters.com:8181
<http://aether.querymasters.com:8181>")
.getOrCreate();
Would I set PT2M in there? Like .config("pt2m","300") ?
I'm not familiar with jshell, so I'm not sure where you're
getting that duration from.
Right now, I'm just doing a count:
Dataset<Row> dataset =
spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "doc");
put("table", "doc");
}
}).load();
dataset.count();
Thank you!
-Joe
On 2/3/2022 3:01 PM, C. Scott Andreas wrote:
Hi Joe, it looks like "PT2M" may refer to a timeout value
that could be set by your Spark job's initialization of the
client. I don't see a string matching this in the Cassandra
codebase itself, but I do see that this is parseable as a
Duration.
```
jshell> java.time.Duration.parse("PT2M").getSeconds()
$7 ==> 120
```
The server-side log you see is likely an indicator of the
timeout from the server's perspective. You might consider
checking lots from the replicas for dropped reads, query
aborts due to scanning more tombstones than the configured
max, or other conditions indicating overload/inability to
serve a response.
If you're running a Spark job, I'd recommend using the
DataStax Spark Cassandra Connector which distributes your
query to executors addressing slices of the token range which
will land on replica sets, avoiding the scatter-gather
behavior that can occur if using the Java driver alone.
Cheers,
– Scott
On Feb 3, 2022, at 11:42 AM, Joe Obernberger
<joseph.obernber...@gmail.com>
<mailto:joseph.obernber...@gmail.com> wrote:
Hi all - using a Cassandra 4.0.1 and a spark job running
against a large
table (~8 billion rows) and I'm getting this error on the
client side:
Query timed out after PT2M
On the server side I see a lot of messages like:
DEBUG [Native-Transport-Requests-39] 2022-02-03 14:39:56,647
ReadCallback.java:119 - Timed out; received 0 of 1 responses
The same code works on another table in the same Cassandra
cluster that
is about 300 million rows and completes in about 2 minutes.
The cluster
is 13 nodes.
I can't find what PT2M means. Perhaps the table needs a
repair? Other
ideas?
Thank you!
-Joe
<http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
Virus-free. www.avg.com
<http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
<#m_4964341664985366658_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>