Thanks, everybody. Setting compaction throughput improved compaction performance. On overloaded cluster number of SSTables dropped from ~16K to ~7K. This way I can wait until it will be stabilized.
PS This task is a one time process - I am upgrading Elasticsearch from v2 to v6 and once I have same data in Cassandra - I decided to load it from Cassandra. Also, I needed to merge this data with other related documents - and Cassandra static field - greatly helps by allowing storing one to many relations in a single table - that's why I copied unjoined data from table_a to table_b. 2018-04-28 8:03 GMT+07:00 Evelyn Smith <u5015...@gmail.com>: > Hi Mikhall, > > There are a few ways to speed up compactions in the short term: > *- nodetool setcompactionthroughput 0* > This will unthrottle compactions but obviously unthrottling compactions > puts you at risk of high latency while compactions are running. > *- nodetool setconcurrentcompactors 2* > You usually want to set this to the lower of disks or cores. If you are > using SSDs you want to use the number of cores which it looks like > d2.xlarge have 2 virtual cores. > *- nodetool disablebinary* > You can use this to stop an individual node from acting as coordinator. > This will let the node focus on catching up on compactions and you can use > it if one or two nodes has significantly higher pending compactions then > the rest of the cluster. > *- nodetool disablegossip / disablethrift* > Same logic as above except with accepting writes and you can only do it > for ~2-2..5 hours or you risk inconsistent data by missing the hinted > handoff period. > > Long term solutions: > *- Consider switching instance type* > The nodes you are using are storage optimised. They have very little > processing power which is needed to process compactions. Also the AWS > documentation seems to suggest HDD not SSD on this instance. Are you sure > you actually have SSDs because that makes a big difference. > *- Add nodes* > The data will redistribute over more nodes and each node will be > responsible for less compactions (less data ~= less compactions) > *- If it’s a batch load make Spark do it* > My impression is that you want to batch load from Cassandra to > Elasticsearch after batch loading from Spark to Cassandra. If that is the > case, why not get Spark to do the batch load if it already has the data > (maybe I’m misinterpreting what you are doing). > *- Consider throttling Spark when it batch loads to Cassandra* > If Cassandra gets overwhelmed it can start acting up, keep an eye out for > lots of undersized SSTables, it might be a sign that Cassandra is running > out of Memory during the batch load and flushing lots of little Memtables > to Disk as SSTables to conserve memory. > > Some final follow up questions: > - What is the purpose of this cluster? > Is it to support BAU, run daily analytics, or event an occasional one time > cluster required to spin up for some analysis before being spun down? This > info helps a lots in understanding where you can make concessions. > - What is the flow of data and what are the timing requirements? > > Cheers, > Eevee. > > > On 28 Apr 2018, at 3:54 am, Mikhail Tsaplin <tsmis...@gmail.com> wrote: > > The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD > disks), Cassandra 3.0.9. > Increased compaction throughput from 16 to 200 - active compaction > remaining time decreased. > What will happen if another node will join the cluster? - will former > nodes move part of theirs SSTables to the new node unchanged and compaction > time will be reduced? > > > > $ nodetool cfstats -H dump_es > > > Keyspace: table_b > Read Count: 0 > Read Latency: NaN ms. > Write Count: 0 > Write Latency: NaN ms. > Pending Flushes: 0 > Table: table_b > SSTable count: 18155 > Space used (live): 1.2 TB > Space used (total): 1.2 TB > Space used by snapshots (total): 0 bytes > Off heap memory used (total): 3.62 GB > SSTable Compression Ratio: 0.20371982719658258 > Number of keys (estimate): 712032622 > Memtable cell count: 0 > Memtable data size: 0 bytes > Memtable off heap memory used: 0 bytes > Memtable switch count: 0 > Local read count: 0 > Local read latency: NaN ms > Local write count: 0 > Local write latency: NaN ms > Pending flushes: 0 > Bloom filter false positives: 0 > Bloom filter false ratio: 0.00000 > Bloom filter space used: 2.22 GB > Bloom filter off heap memory used: 2.56 GB > Index summary off heap memory used: 357.51 MB > Compression metadata off heap memory used: 724.97 MB > Compacted partition minimum bytes: 771 bytes > Compacted partition maximum bytes: 1.55 MB > Compacted partition mean bytes: 3.47 KB > Average live cells per slice (last five minutes): NaN > Maximum live cells per slice (last five minutes): 0 > Average tombstones per slice (last five minutes): NaN > Maximum tombstones per slice (last five minutes): 0 > > > 2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <nicolas.guyo...@gmail.com>: > >> Hi Mikhail, >> >> Could you please provide : >> - your cluster version/topology (number of nodes, cpu, ram available etc) >> - what kind of underlying storage you are using >> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB >> >> You are storing 1Tb per node, so long running compaction is not really a >> surprise, you can play with concurrent compaction thread number, compaction >> throughput to begin with >> >> >> On 27 April 2018 at 16:59, Mikhail Tsaplin <tsmis...@gmail.com> wrote: >> >>> Hi, >>> I have a five nodes C* cluster suffering from a big number of pending >>> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232 >>> >>> Initially, it was holding one big table (table_a). With Spark, I read >>> that table, extended its data and stored in a second table_b. After this >>> copying/extending process the number of compaction tasks in the cluster has >>> grown up. From nodetool cfstats (see output at the bottom): table_a has 20 >>> SSTables and table_b has 18219. >>> >>> As I understand table_b has a big SSTables number because data was >>> transferred from one table to another within a short time and eventually >>> this tables will be compacted. But now I have to read whole data from this >>> table_b and send it to Elasticsearch. When Spark reads this table some >>> Cassandra nodes are dying because of OOM. >>> >>> I think that when compaction will be completed - the Spark reading job >>> will work fine. >>> >>> The question is how can I speed up compaction process, what if I will >>> add another two nodes to cluster - will compaction finish faster? Or data >>> will be copied to new nodes but compaction will continue on the original >>> set of SSTables? >>> >>> >>> *Nodetool cfstats output: >>> >>> Table: table_a >>> SSTable count: 20 >>> Space used (live): 1064889308052 >>> Space used (total): 1064889308052 >>> Space used by snapshots (total): 0 >>> Off heap memory used (total): 1118106937 >>> SSTable Compression Ratio: 0.12564594959566894 >>> Number of keys (estimate): 56238959 >>> Memtable cell count: 76824 >>> Memtable data size: 115531402 >>> Memtable off heap memory used: 0 >>> Memtable switch count: 17 >>> Local read count: 0 >>> Local read latency: NaN ms >>> Local write count: 77308 >>> Local write latency: 0.045 ms >>> Pending flushes: 0 >>> Bloom filter false positives: 0 >>> Bloom filter false ratio: 0.00000 >>> Bloom filter space used: 120230328 >>> Bloom filter off heap memory used: 120230168 >>> Index summary off heap memory used: 2837249 >>> Compression metadata off heap memory used: 995039520 >>> Compacted partition minimum bytes: 1110 >>> Compacted partition maximum bytes: 52066354 >>> Compacted partition mean bytes: 133152 >>> Average live cells per slice (last five minutes): NaN >>> Maximum live cells per slice (last five minutes): 0 >>> Average tombstones per slice (last five minutes): NaN >>> Maximum tombstones per slice (last five minutes): 0 >>> >>> >>> nodetool cfstats table_b >>> Keyspace: dump_es >>> Read Count: 0 >>> Read Latency: NaN ms. >>> Write Count: 0 >>> Write Latency: NaN ms. >>> Pending Flushes: 0 >>> Table: table_b >>> SSTable count: 18219 >>> Space used (live): 1316641151665 >>> Space used (total): 1316641151665 >>> Space used by snapshots (total): 0 >>> Off heap memory used (total): 3863604976 >>> SSTable Compression Ratio: 0.20387645535477916 >>> Number of keys (estimate): 712032622 >>> Memtable cell count: 0 >>> Memtable data size: 0 >>> Memtable off heap memory used: 0 >>> Memtable switch count: 0 >>> Local read count: 0 >>> Local read latency: NaN ms >>> Local write count: 0 >>> Local write latency: NaN ms >>> Pending flushes: 0 >>> Bloom filter false positives: 0 >>> Bloom filter false ratio: 0.00000 >>> Bloom filter space used: 2382971488 >>> Bloom filter off heap memory used: 2742320056 >>> Index summary off heap memory used: 371500752 >>> Compression metadata off heap memory used: 749784168 >>> Compacted partition minimum bytes: 771 >>> Compacted partition maximum bytes: 1629722 >>> Compacted partition mean bytes: 3555 >>> Average live cells per slice (last five minutes): 132.375 >>> Maximum live cells per slice (last five minutes): 149 >>> Average tombstones per slice (last five minutes): 1.0 >>> Maximum tombstones per slice (last five minutes): 1 >>> >>> >>> ------------------ >>> >>> >>> I logged CQL requests going from Spark and checked how one such request >>> is performing - it fetches 8075rows, 59mb data in 155s (see below check >>> output) >>> >>> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc", >>> "snapshot_doc" FROM "dump_es"."table_b" WHERE token("scan_id") > >>> 946122293981930504 AND token("scan_id") <= 946132293981 >>> 930504 ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date >>> >>> >>> Fri Apr 27 13:32:55 UTC 2018 >>> 8076 61191 59009831 >>> Fri Apr 27 13:35:30 UTC 2018 >>> >>> >>> >> > >