yifan-c commented on code in PR #73: URL: https://github.com/apache/cassandra-analytics/pull/73#discussion_r1717309983
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java: ########## @@ -34,53 +34,62 @@ import org.apache.cassandra.util.ThreadUtil; -public class HeartbeatReporter implements Closeable +/** + * Scheduler for simple and short tasks + */ +public class SimpleTaskScheduler implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatReporter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleTaskScheduler.class); private final ScheduledExecutorService scheduler; - private final Map<String, ScheduledFuture<?>> scheduledHeartbeats; + private final Map<String, ScheduledFuture<?>> scheduledTasks; private boolean isClosed; - public HeartbeatReporter() + public SimpleTaskScheduler() { - ThreadFactory tf = ThreadUtil.threadFactory("Heartbeat reporter"); + ThreadFactory tf = ThreadUtil.threadFactory("bulk-write-simple-task-scheduler"); this.scheduler = Executors.newSingleThreadScheduledExecutor(tf); - this.scheduledHeartbeats = new HashMap<>(); + this.scheduledTasks = new HashMap<>(); this.isClosed = false; } - public synchronized void schedule(String name, long heartBeatIntervalMillis, Runnable heartBeat) + public synchronized void schedule(String name, long delayMillis, Runnable task) { - if (isClosed) + if (isClosed() || isScheduled(name)) { - LOGGER.info("HeartbeatReporter is already closed"); return; } - if (scheduledHeartbeats.containsKey(name)) + ScheduledFuture<?> fut = scheduler.schedule(new NoThrow(name, task), + delayMillis, + TimeUnit.MILLISECONDS); + scheduledTasks.put(name, fut); + } + + public synchronized void schedulePeriodic(String name, long taskIntervalMillis, Runnable task) + { + if (isClosed() || isScheduled(name)) { - LOGGER.info("The heartbeat has been scheduled already. heartbeat={}", name); return; } - ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new NoThrow(name, heartBeat), - heartBeatIntervalMillis, // initial delay - heartBeatIntervalMillis, // delay + + ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new NoThrow(name, task), + taskIntervalMillis, // initial delay + taskIntervalMillis, // delay TimeUnit.MILLISECONDS); - scheduledHeartbeats.put(name, fut); + scheduledTasks.put(name, fut); Review Comment: scheduling with the same name would be rejected from the _synchronized_ methods, which do that check ``` if (isClosed() || isScheduled(name)) { return; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org