Update - the answer was spark.cassandra.input.split.sizeInMB. The default value is 512MBytes.  Setting this to 50 resulted in a lot more splits and the job ran in under 11 minutes; no timeout errors.  In this case the job was a simple count.  10 minutes 48 seconds for over 8.2 billion rows.  Fast!

Good times ahead.

-Joe

On 2/8/2022 10:00 AM, Joe Obernberger wrote:

Update - I believe that for large tables, the spark.cassandra.read.timeoutMS needs to be very long; like 4 hours or longer.  The job now runs much longer, but still doesn't complete.  I'm now facing this all too familiar error: com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)

In the past this has been due to clocks being out of sync (not the issue here), or a table that has been written to with LOCAL_ONE instead of LOCAL_QUORUM.  I don't believe either of those are the case.  To be sure, I ran a repair on the table overnight (about 17 hours to complete).  For the next test, I set the spark.cassandra.connection.timeoutMS to 60000 (default is 5000), and the spark.cassandra.query.retry.count to -1.

Suggestions?  Thoughts?

Thanks all.

-Joe

On 2/7/2022 10:35 AM, Joe Obernberger wrote:

Some more info.  Tried different GC strategies - no luck.
It only happens on large tables (more than 1 billion rows). Works fine on a 300million row table.  There is very high CPU usage during the run.

I've tried setting spark.dse.continuousPagingEnabled to false and I've tried setting spark.cassandra.input.readsPerSec to 10; no effect.

Stats:

nodetool cfstats doc.doc
Total number of tables: 82
----------------
Keyspace : doc
        Read Count: 9620329
        Read Latency: 0.5629605546754171 ms
        Write Count: 510561482
        Write Latency: 0.02805177028806885 ms
        Pending Flushes: 0
                Table: doc
                SSTable count: 77
                Old SSTable count: 0
                Space used (live): 82061188941
                Space used (total): 82061188941
                Space used by snapshots (total): 0
                Off heap memory used (total): 317037065
                SSTable Compression Ratio: 0.3816525125492022
                Number of partitions (estimate): 101021793
                Memtable cell count: 209646
                Memtable data size: 44087966
                Memtable off heap memory used: 0
                Memtable switch count: 10
                Local read count: 25665
                Local read latency: NaN ms
                Local write count: 2459322
                Local write latency: NaN ms
                Pending flushes: 0
                Percent repaired: 0.0
                Bytes repaired: 0.000KiB
                Bytes unrepaired: 184.869GiB
                Bytes pending repair: 0.000KiB
                Bloom filter false positives: 2063
                Bloom filter false ratio: 0.01020
                Bloom filter space used: 169249016
                Bloom filter off heap memory used: 169248400
                Index summary off heap memory used: 50863401
                Compression metadata off heap memory used: 96925264
                Compacted partition minimum bytes: 104
                Compacted partition maximum bytes: 943127
                Compacted partition mean bytes: 1721
                Average live cells per slice (last five minutes): NaN
                Maximum live cells per slice (last five minutes): 0
                Average tombstones per slice (last five minutes): NaN
                Maximum tombstones per slice (last five minutes): 0
                Dropped Mutations: 0


nodetool tablehistograms doc.doc
doc/doc histograms
Percentile      Read Latency     Write Latency SSTables    Partition Size        Cell Count
                    (micros) (micros)                             (bytes)
50%                     0.00              0.00 0.00              1109                86 75%                     0.00              0.00 0.00              3311               215 95%                     0.00              0.00 0.00              3311               215 98%                     0.00              0.00 0.00              3311               215 99%                     0.00              0.00 0.00              3311               215 Min                     0.00              0.00 0.00               104                 5 Max                     0.00              0.00 0.00            943127              2299

I'm stuck.

-Joe


On 2/3/2022 9:30 PM, manish khandelwal wrote:
It maybe the case you have lots of tombstones in this table which is making reads slow and timeouts during bulk reads.

On Fri, Feb 4, 2022, 03:23 Joe Obernberger <joseph.obernber...@gmail.com> wrote:

    So it turns out that number after PT is increments of 60
    seconds.  I changed the timeout to 960000, and now I get PT16M
    (960000/60000).  Since I'm still getting timeouts, something
    else must be wrong.

    Exception in thread "main" org.apache.spark.SparkException: Job
    aborted due to stage failure: Task 306 in stage 0.0 failed 4
    times, most recent failure: Lost task 306.3 in stage 0.0 (TID
    1180) (172.16.100.39 executor 0):
    com.datastax.oss.driver.api.core.DriverTimeoutException: Query
    timed out after PT16M
            at
    
com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206)
            at
    
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
            at
    
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
            at
    
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
            at
    
com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:829)

    Driver stacktrace:
            at
    
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
            at
    
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
            at
    
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
            at
    scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
            at
    scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
            at
    scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
            at
    org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
            at
    
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
            at
    
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
            at scala.Option.foreach(Option.scala:407)
            at
    
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
            at
    
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
            at
    
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
            at
    
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
            at
    org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    Caused by:
    com.datastax.oss.driver.api.core.DriverTimeoutException: Query
    timed out after PT16M
            at
    
com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206)
            at
    
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
            at
    
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
            at
    
com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
            at
    
com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

    -Joe

    On 2/3/2022 3:30 PM, Joe Obernberger wrote:

    I did find this:
    
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md

    And "spark.cassandra.read.timeoutMS" is set to 120000.

    Running a test now, and I think that is it.  Thank you Scott.

    -Joe

    On 2/3/2022 3:19 PM, Joe Obernberger wrote:

    Thank you Scott!
    I am using the spark cassandra connector.  Code:

    SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkCassandraApp")
    .config("spark.cassandra.connection.host", "chaos")
    .config("spark.cassandra.connection.port", "9042")
                    .master("spark://aether.querymasters.com:8181
    <http://aether.querymasters.com:8181>")
                    .getOrCreate();

    Would I set PT2M in there?  Like .config("pt2m","300") ?
    I'm not familiar with jshell, so I'm not sure where you're
    getting that duration from.

    Right now, I'm just doing a count:
    Dataset<Row> dataset =
    spark.read().format("org.apache.spark.sql.cassandra")
                    .options(new HashMap<String, String>() {
                        {
                            put("keyspace", "doc");
                            put("table", "doc");
                        }
                    }).load();

    dataset.count();


    Thank you!

    -Joe

    On 2/3/2022 3:01 PM, C. Scott Andreas wrote:
    Hi Joe, it looks like "PT2M" may refer to a timeout value
    that could be set by your Spark job's initialization of the
    client. I don't see a string matching this in the Cassandra
    codebase itself, but I do see that this is parseable as a
    Duration.

    ```
    jshell> java.time.Duration.parse("PT2M").getSeconds()
    $7 ==> 120
    ```

    The server-side log you see is likely an indicator of the
    timeout from the server's perspective. You might consider
    checking lots from the replicas for dropped reads, query
    aborts due to scanning more tombstones than the configured
    max, or other conditions indicating overload/inability to
    serve a response.

    If you're running a Spark job, I'd recommend using the
    DataStax Spark Cassandra Connector which distributes your
    query to executors addressing slices of the token range which
    will land on replica sets, avoiding the scatter-gather
    behavior that can occur if using the Java driver alone.

    Cheers,

    – Scott


    On Feb 3, 2022, at 11:42 AM, Joe Obernberger
    <joseph.obernber...@gmail.com>
    <mailto:joseph.obernber...@gmail.com> wrote:


    Hi all - using a Cassandra 4.0.1 and a spark job running
    against a large
    table (~8 billion rows) and I'm getting this error on the
    client side:
    Query timed out after PT2M

    On the server side I see a lot of messages like:
    DEBUG [Native-Transport-Requests-39] 2022-02-03 14:39:56,647
    ReadCallback.java:119 - Timed out; received 0 of 1 responses

    The same code works on another table in the same Cassandra
    cluster that
    is about 300 million rows and completes in about 2 minutes. 
    The cluster
    is 13 nodes.

    I can't find what PT2M means. Perhaps the table needs a
    repair? Other
    ideas?
    Thank you!

    -Joe


    
<http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
        Virus-free. www.avg.com
    
<http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>


    <#m_4964341664985366658_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Reply via email to