currently to prevent read failure in case of a single replica failure,
I need to specify CL > ONE when updating a counter, and when such an add
happens,
the wait is longer than a regular insert:

coordinator ---> leader ---> leader does sstable tally up ----> leader waits
for at least one replica to ack the receipt of tallied count.


compared to regular insert:
coordinator ----> wait for > 1 replicas to respond



I think we can achieve the same wait time as regular insert, if we can
assume that the communication between cassandra nodes satisfy FIFO
property.
this is a similar route of thinking that enabled ZooKeeper to come up with a
simpler protocol than, say, Paxos.  by the same argument that ZooKeeper
made:
we already use TCP, and TCP does careful work to ensure FIFO, so the
communication between coordinator and replicas is FIFO. ---- the actual
message sending
mechanism used by Cassandra is OutboundTcpConnection  , I understand that
it's above TCP level, but if someone can verify that it indeed guarantees
FIFO,
then the following protocol works for counter adds:


let's say we have 16 nodes in total in our cluster: node 1, 2,.... 16
and RF=3
now a Counter add request (+1) arrives at node 10, and node 10 determines
that the write should go to node 1,2,3,
every node, when it acts a coordinator, keeps an increasing serial number to
be attached to each message it sends out (we don't even need to call this a
logical clock, since
we don't care about incoming messages to the coordinator, nor does a
coordinator have any incoming messages from other coordinators). then node
10 sends out a
message (mutation)   { +1, serial=1 } to node 1, 2,3.  it can wait for a
number of replies specified by CL.

node 1 keeps a table for the count it has seen from each coordinator. (this
is similar to what #1072 has now, but the important diff is that the count
is bucketed by cordinator, not leader)
now node 1's table becomes:
----------------------------------------------
coordinator 1   null
coordinator 2   null
.....
coordinator 9  null
coordinator 10        count= 0+1 = 1, highest_serial = 1
.....
----------------------------------------

same for node 2  and 3

as more requests are delivered from node 10 to these 3, we may typically see
something like this:
node 1's table:
-----------------------------
...
coord 10    highest_serial=10  count=123: ( memtable{ count=23  }      SST1
{count=100} )
-----------------------------

node 2's table:
-----------------------------------
.....
coord 10   highest_serial=9    count=100: ( memtable{count=90  }   SST1 {
count=10 }  )
----------------------------------

node 3's table
---------------------------------
......
coord 10   highest_serial=14  count=140: ( memtable {count=10}  SST1 {
count=130} )
--------------------------------



we claim that, because of the FIFO assumption, here node 2 has seen all
messages  from coord 10  with a serial <=9, , and node 3 has seen all
messages with serial <=14, so that node 2's history is a prefix of that of
node 3. i.e. *what we read out immediately from node 2 represents a value
that we could have read out from a past instant from node 3, i.e. the
current value of node 2 is a  valid value, although slightly stale.  *
*
*
one slight technical difficulty is that when repair happens, let's say we
use node 3's value to repair node 2: , then node 3 sends over the
total_count=140, serial=14, and node 2 sees that its serial 9<14, so
it updates its current record to count=140, serial=14. moreover, node 2
should wipe out old records, this can be done by placing an "invalidator"
record , so so when node 2 does tally up later, it ignores all records with
serial smaller than the "invalidator", or we can place a fake record with
count= - 100, and effectively wipe out the effects of previous deltas.


the above shows the procedure for counting the updates from one fixed
coordinator, the same procedure can be carried out for all individual
coordinators ,* completely separately.  *if we want to read out the total
count, you can simply read out the count from one replica, or even better,
from multiple, and do reconciliation.  ------ realistically, we are not
going to have a cluster more than 1000 nodes, so each node's table is not
going to grow indefinitely.



we claim that the benefit of this protocol is that the client and
coordinator can reach the same fast write/update as regular columns, without
a read-write pattern,  and essentially do "fire and forget", while the
tally-up is left over to background work. we also avoid the extra "leader"
network hop required by current implementation.


Thanks
Yang

Reply via email to