currently to prevent read failure in case of a single replica failure, I need to specify CL > ONE when updating a counter, and when such an add happens, the wait is longer than a regular insert:
coordinator ---> leader ---> leader does sstable tally up ----> leader waits for at least one replica to ack the receipt of tallied count. compared to regular insert: coordinator ----> wait for > 1 replicas to respond I think we can achieve the same wait time as regular insert, if we can assume that the communication between cassandra nodes satisfy FIFO property. this is a similar route of thinking that enabled ZooKeeper to come up with a simpler protocol than, say, Paxos. by the same argument that ZooKeeper made: we already use TCP, and TCP does careful work to ensure FIFO, so the communication between coordinator and replicas is FIFO. ---- the actual message sending mechanism used by Cassandra is OutboundTcpConnection , I understand that it's above TCP level, but if someone can verify that it indeed guarantees FIFO, then the following protocol works for counter adds: let's say we have 16 nodes in total in our cluster: node 1, 2,.... 16 and RF=3 now a Counter add request (+1) arrives at node 10, and node 10 determines that the write should go to node 1,2,3, every node, when it acts a coordinator, keeps an increasing serial number to be attached to each message it sends out (we don't even need to call this a logical clock, since we don't care about incoming messages to the coordinator, nor does a coordinator have any incoming messages from other coordinators). then node 10 sends out a message (mutation) { +1, serial=1 } to node 1, 2,3. it can wait for a number of replies specified by CL. node 1 keeps a table for the count it has seen from each coordinator. (this is similar to what #1072 has now, but the important diff is that the count is bucketed by cordinator, not leader) now node 1's table becomes: ---------------------------------------------- coordinator 1 null coordinator 2 null ..... coordinator 9 null coordinator 10 count= 0+1 = 1, highest_serial = 1 ..... ---------------------------------------- same for node 2 and 3 as more requests are delivered from node 10 to these 3, we may typically see something like this: node 1's table: ----------------------------- ... coord 10 highest_serial=10 count=123: ( memtable{ count=23 } SST1 {count=100} ) ----------------------------- node 2's table: ----------------------------------- ..... coord 10 highest_serial=9 count=100: ( memtable{count=90 } SST1 { count=10 } ) ---------------------------------- node 3's table --------------------------------- ...... coord 10 highest_serial=14 count=140: ( memtable {count=10} SST1 { count=130} ) -------------------------------- we claim that, because of the FIFO assumption, here node 2 has seen all messages from coord 10 with a serial <=9, , and node 3 has seen all messages with serial <=14, so that node 2's history is a prefix of that of node 3. i.e. *what we read out immediately from node 2 represents a value that we could have read out from a past instant from node 3, i.e. the current value of node 2 is a valid value, although slightly stale. * * * one slight technical difficulty is that when repair happens, let's say we use node 3's value to repair node 2: , then node 3 sends over the total_count=140, serial=14, and node 2 sees that its serial 9<14, so it updates its current record to count=140, serial=14. moreover, node 2 should wipe out old records, this can be done by placing an "invalidator" record , so so when node 2 does tally up later, it ignores all records with serial smaller than the "invalidator", or we can place a fake record with count= - 100, and effectively wipe out the effects of previous deltas. the above shows the procedure for counting the updates from one fixed coordinator, the same procedure can be carried out for all individual coordinators ,* completely separately. *if we want to read out the total count, you can simply read out the count from one replica, or even better, from multiple, and do reconciliation. ------ realistically, we are not going to have a cluster more than 1000 nodes, so each node's table is not going to grow indefinitely. we claim that the benefit of this protocol is that the client and coordinator can reach the same fast write/update as regular columns, without a read-write pattern, and essentially do "fire and forget", while the tally-up is left over to background work. we also avoid the extra "leader" network hop required by current implementation. Thanks Yang