Thanks, everybody.
Setting compaction throughput improved compaction performance. On
overloaded cluster number of SSTables dropped from ~16K to ~7K. This way I
can wait until it will be stabilized.

PS
This task is a one time process - I am upgrading Elasticsearch from v2 to
v6 and once I have same data in Cassandra - I decided to load it from
Cassandra. Also, I needed to merge this data with other related documents -
and Cassandra static field - greatly helps by allowing storing one to many
relations in a single table - that's why I copied unjoined data from
table_a to table_b.

2018-04-28 8:03 GMT+07:00 Evelyn Smith <u5015...@gmail.com>:

> Hi Mikhall,
>
> There are a few ways to speed up compactions in the short term:
> *- nodetool setcompactionthroughput 0*
> This will unthrottle compactions but obviously unthrottling compactions
> puts you at risk of high latency while compactions are running.
> *- nodetool setconcurrentcompactors 2*
> You usually want to set this to the lower of disks or cores. If you are
> using SSDs you want to use the number of cores which it looks like
> d2.xlarge have 2 virtual cores.
> *- nodetool disablebinary*
> You can use this to stop an individual node from acting as coordinator.
> This will let the node focus on catching up on compactions and you can use
> it if one or two nodes has significantly higher pending compactions then
> the rest of the cluster.
> *- nodetool disablegossip / disablethrift*
> Same logic as above except with accepting writes and you can only do it
> for ~2-2..5 hours or you risk inconsistent data by missing the hinted
> handoff period.
>
> Long term solutions:
> *- Consider switching instance type*
> The nodes you are using are storage optimised. They have very little
> processing power which is needed to process compactions. Also the AWS
> documentation seems to suggest HDD not SSD on this instance. Are you sure
> you actually have SSDs because that makes a big difference.
> *- Add nodes*
> The data will redistribute over more nodes and each node will be
> responsible for less compactions (less data ~= less compactions)
> *- If it’s a batch load make Spark do it*
> My impression is that you want to batch load from Cassandra to
> Elasticsearch after batch loading from Spark to Cassandra. If that is the
> case, why not get Spark to do the batch load if it already has the data
> (maybe I’m misinterpreting what you are doing).
> *- Consider throttling Spark when it batch loads to Cassandra*
> If Cassandra gets overwhelmed it can start acting up, keep an eye out for
> lots of undersized SSTables, it might be a sign that Cassandra is running
> out of Memory during the batch load and flushing lots of little Memtables
> to Disk as SSTables to conserve memory.
>
> Some final follow up questions:
> - What is the purpose of this cluster?
> Is it to support BAU, run daily analytics, or event an occasional one time
> cluster required to spin up for some analysis before being spun down? This
> info helps a lots in understanding where you can make concessions.
>  - What is the flow of data and what are the timing requirements?
>
> Cheers,
> Eevee.
>
>
> On 28 Apr 2018, at 3:54 am, Mikhail Tsaplin <tsmis...@gmail.com> wrote:
>
> The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD
> disks), Cassandra 3.0.9.
> Increased compaction throughput from 16 to 200 - active compaction
> remaining time decreased.
> What will happen if another node will join the cluster? - will former
> nodes move part of theirs SSTables to the new node unchanged and compaction
> time will be reduced?
>
>
>
> $ nodetool cfstats -H  dump_es
>
>
> Keyspace: table_b
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18155
>                 Space used (live): 1.2 TB
>                 Space used (total): 1.2 TB
>                 Space used by snapshots (total): 0 bytes
>                 Off heap memory used (total): 3.62 GB
>                 SSTable Compression Ratio: 0.20371982719658258
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0 bytes
>                 Memtable off heap memory used: 0 bytes
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2.22 GB
>                 Bloom filter off heap memory used: 2.56 GB
>                 Index summary off heap memory used: 357.51 MB
>                 Compression metadata off heap memory used: 724.97 MB
>                 Compacted partition minimum bytes: 771 bytes
>                 Compacted partition maximum bytes: 1.55 MB
>                 Compacted partition mean bytes: 3.47 KB
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
>
>
> 2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <nicolas.guyo...@gmail.com>:
>
>> Hi Mikhail,
>>
>> Could you please provide :
>> - your cluster version/topology (number of nodes, cpu, ram available etc)
>> - what kind of underlying storage you are using
>> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB
>>
>> You are storing 1Tb per node, so long running compaction is not really a
>> surprise, you can play with concurrent compaction thread number, compaction
>> throughput to begin with
>>
>>
>> On 27 April 2018 at 16:59, Mikhail Tsaplin <tsmis...@gmail.com> wrote:
>>
>>> Hi,
>>> I have a five nodes C* cluster suffering from a big number of pending
>>> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
>>>
>>> Initially, it was holding one big table (table_a). With Spark, I read
>>> that table, extended its data and stored in a second table_b. After this
>>> copying/extending process the number of compaction tasks in the cluster has
>>> grown up. From nodetool cfstats (see output at the bottom): table_a has 20
>>> SSTables and table_b has 18219.
>>>
>>> As I understand table_b has a big SSTables number because data was
>>> transferred from one table to another within a short time and eventually
>>> this tables will be compacted. But now I have to read whole data from this
>>> table_b and send it to Elasticsearch. When Spark reads this table some
>>> Cassandra nodes are dying because of OOM.
>>>
>>> I think that when compaction will be completed - the Spark reading job
>>> will work fine.
>>>
>>> The question is how can I speed up compaction process, what if I will
>>> add another two nodes to cluster - will compaction finish faster? Or data
>>> will be copied to new nodes but compaction will continue on the original
>>> set of SSTables?
>>>
>>>
>>> *Nodetool cfstats output:
>>>
>>>                 Table: table_a
>>>                 SSTable count: 20
>>>                 Space used (live): 1064889308052
>>>                 Space used (total): 1064889308052
>>>                 Space used by snapshots (total): 0
>>>                 Off heap memory used (total): 1118106937
>>>                 SSTable Compression Ratio: 0.12564594959566894
>>>                 Number of keys (estimate): 56238959
>>>                 Memtable cell count: 76824
>>>                 Memtable data size: 115531402
>>>                 Memtable off heap memory used: 0
>>>                 Memtable switch count: 17
>>>                 Local read count: 0
>>>                 Local read latency: NaN ms
>>>                 Local write count: 77308
>>>                 Local write latency: 0.045 ms
>>>                 Pending flushes: 0
>>>                 Bloom filter false positives: 0
>>>                 Bloom filter false ratio: 0.00000
>>>                 Bloom filter space used: 120230328
>>>                 Bloom filter off heap memory used: 120230168
>>>                 Index summary off heap memory used: 2837249
>>>                 Compression metadata off heap memory used: 995039520
>>>                 Compacted partition minimum bytes: 1110
>>>                 Compacted partition maximum bytes: 52066354
>>>                 Compacted partition mean bytes: 133152
>>>                 Average live cells per slice (last five minutes): NaN
>>>                 Maximum live cells per slice (last five minutes): 0
>>>                 Average tombstones per slice (last five minutes): NaN
>>>                 Maximum tombstones per slice (last five minutes): 0
>>>
>>>
>>> nodetool cfstats table_b
>>> Keyspace: dump_es
>>>         Read Count: 0
>>>         Read Latency: NaN ms.
>>>         Write Count: 0
>>>         Write Latency: NaN ms.
>>>         Pending Flushes: 0
>>>                 Table: table_b
>>>                 SSTable count: 18219
>>>                 Space used (live): 1316641151665
>>>                 Space used (total): 1316641151665
>>>                 Space used by snapshots (total): 0
>>>                 Off heap memory used (total): 3863604976
>>>                 SSTable Compression Ratio: 0.20387645535477916
>>>                 Number of keys (estimate): 712032622
>>>                 Memtable cell count: 0
>>>                 Memtable data size: 0
>>>                 Memtable off heap memory used: 0
>>>                 Memtable switch count: 0
>>>                 Local read count: 0
>>>                 Local read latency: NaN ms
>>>                 Local write count: 0
>>>                 Local write latency: NaN ms
>>>                 Pending flushes: 0
>>>                 Bloom filter false positives: 0
>>>                 Bloom filter false ratio: 0.00000
>>>                 Bloom filter space used: 2382971488
>>>                 Bloom filter off heap memory used: 2742320056
>>>                 Index summary off heap memory used: 371500752
>>>                 Compression metadata off heap memory used: 749784168
>>>                 Compacted partition minimum bytes: 771
>>>                 Compacted partition maximum bytes: 1629722
>>>                 Compacted partition mean bytes: 3555
>>>                 Average live cells per slice (last five minutes): 132.375
>>>                 Maximum live cells per slice (last five minutes): 149
>>>                 Average tombstones per slice (last five minutes): 1.0
>>>                 Maximum tombstones per slice (last five minutes): 1
>>>
>>>
>>> ------------------
>>>
>>>
>>> I logged CQL requests going from Spark and checked how one such request
>>> is performing - it fetches 8075rows, 59mb data in 155s (see below check
>>> output)
>>>
>>> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc",
>>> "snapshot_doc" FROM "dump_es"."table_b" WHERE token("scan_id") >
>>> 946122293981930504 AND token("scan_id") <= 946132293981
>>> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date
>>>
>>>
>>> Fri Apr 27 13:32:55 UTC 2018
>>>    8076   61191 59009831
>>> Fri Apr 27 13:35:30 UTC 2018
>>>
>>>
>>>
>>
>
>

Reply via email to