bdeggleston commented on code in PR #4761:
URL: https://github.com/apache/cassandra/pull/4761#discussion_r3171114112


##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -1372,6 +1374,94 @@ static int loadHostLogIdFromSystemTable()
         return rows.one().getInt("host_log_id");
     }
 
+    private static class BackgroundReconciler
+    {
+        private static final long RECONCILE_INTERVAL_MILLIS = 1_000;

Review Comment:
   I think should probably be a config property / hot prop



##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -1372,6 +1374,94 @@ static int loadHostLogIdFromSystemTable()
         return rows.one().getInt("host_log_id");
     }
 
+    private static class BackgroundReconciler
+    {
+        private static final long RECONCILE_INTERVAL_MILLIS = 1_000;
+
+        private volatile boolean isPaused = false;

Review Comment:
   enabling/disabling should also be configurable and jmx adjustable. That 
said, we should log a warning if we startup with mutation tracking keyspaces, 
or create them at runtime and background reconciliation is disabled



##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -1372,6 +1374,94 @@ static int loadHostLogIdFromSystemTable()
         return rows.one().getInt("host_log_id");
     }
 
+    private static class BackgroundReconciler
+    {
+        private static final long RECONCILE_INTERVAL_MILLIS = 1_000;
+
+        private volatile boolean isPaused = false;
+
+        void start()
+        {
+            executor.scheduleWithFixedDelay(this::run,
+                                            RECONCILE_INTERVAL_MILLIS,
+                                            RECONCILE_INTERVAL_MILLIS,
+                                            TimeUnit.MILLISECONDS);
+        }
+
+        void run()
+        {
+            MutationTrackingService.instance().forEachKeyspace(this::run);
+        }
+
+        private void run(KeyspaceShards shards)
+        {
+            if (!isPaused)
+                shards.forEachShard(this::run);
+        }
+
+        private void run(Shard shard)
+        {
+            try
+            {
+                List<Offsets.Immutable> missing = 
shard.collectLocallyMissingOffsets();

Review Comment:
   We need to take care to not spam nodes with the same mutation requests over 
and over here. Asking a node that's falling behind for multiple copies of the 
same mutation is going to help tip it over. Additionally, and I'm not sure to 
what degree this is actually a problem, but without a cool off period for new 
mutations here, we're going have some amount of fetches that are already on the 
way caused by races between mutation summary receipts and organic mutation 
receipt. 
   
   That said, it may be best to open a followup JIRA for these improvements 
instead of adding to the scope of this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to