yifan-c commented on code in PR #73:
URL: 
https://github.com/apache/cassandra-analytics/pull/73#discussion_r1717309983


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -34,53 +34,62 @@
 
 import org.apache.cassandra.util.ThreadUtil;
 
-public class HeartbeatReporter implements Closeable
+/**
+ * Scheduler for simple and short tasks
+ */
+public class SimpleTaskScheduler implements Closeable
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatReporter.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleTaskScheduler.class);
 
     private final ScheduledExecutorService scheduler;
-    private final Map<String, ScheduledFuture<?>> scheduledHeartbeats;
+    private final Map<String, ScheduledFuture<?>> scheduledTasks;
     private boolean isClosed;
 
-    public HeartbeatReporter()
+    public SimpleTaskScheduler()
     {
-        ThreadFactory tf = ThreadUtil.threadFactory("Heartbeat reporter");
+        ThreadFactory tf = 
ThreadUtil.threadFactory("bulk-write-simple-task-scheduler");
         this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
-        this.scheduledHeartbeats = new HashMap<>();
+        this.scheduledTasks = new HashMap<>();
         this.isClosed = false;
     }
 
-    public synchronized void schedule(String name, long 
heartBeatIntervalMillis, Runnable heartBeat)
+    public synchronized void schedule(String name, long delayMillis, Runnable 
task)
     {
-        if (isClosed)
+        if (isClosed() || isScheduled(name))
         {
-            LOGGER.info("HeartbeatReporter is already closed");
             return;
         }
 
-        if (scheduledHeartbeats.containsKey(name))
+        ScheduledFuture<?> fut = scheduler.schedule(new NoThrow(name, task),
+                                                    delayMillis,
+                                                    TimeUnit.MILLISECONDS);
+        scheduledTasks.put(name, fut);
+    }
+
+    public synchronized void schedulePeriodic(String name, long 
taskIntervalMillis, Runnable task)
+    {
+        if (isClosed() || isScheduled(name))
         {
-            LOGGER.info("The heartbeat has been scheduled already. 
heartbeat={}", name);
             return;
         }
-        ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new 
NoThrow(name, heartBeat),
-                                                                  
heartBeatIntervalMillis, // initial delay
-                                                                  
heartBeatIntervalMillis, // delay
+
+        ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new 
NoThrow(name, task),
+                                                                  
taskIntervalMillis, // initial delay
+                                                                  
taskIntervalMillis, // delay
                                                                   
TimeUnit.MILLISECONDS);
-        scheduledHeartbeats.put(name, fut);
+        scheduledTasks.put(name, fut);

Review Comment:
   scheduling with the same name would be rejected from the _synchronized_ 
methods, which do that check 
   ```
           if (isClosed() || isScheduled(name))
           {
               return;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to