sijie commented on a change in pull request #275: BOOKKEEPER-1102: Fix BookKeeeperDiskSpaceWeightedLedgerPlacementTest URL: https://github.com/apache/bookkeeper/pull/275#discussion_r128890618
########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java ########## @@ -80,121 +78,278 @@ public String toString() { } } + + /** + * Tracks the most recently reported set of bookies from BookieWatcher as well + * as current BookieInfo for bookies we've successfully queried. + */ + private static class BookieInfoMap { + /** + * Contains the most recently obtained information on the contained bookies. + * When an error happens querying a bookie, the entry is removed. + */ + private final Map<BookieSocketAddress, BookieInfo> infoMap = new HashMap<>(); + + /** + * Contains the most recently reported set of bookies from BookieWatcher + * A partial query consists of every member of mostRecentlyReportedBookies + * minus the entries in bookieInfoMap. + */ + private Collection<BookieSocketAddress> mostRecentlyReportedBookies = new ArrayList<>(); + + public void updateBookies(Collection<BookieSocketAddress> updatedBookieSet) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "updateBookies: current: {}, new: {}", + mostRecentlyReportedBookies, updatedBookieSet); + } + infoMap.keySet().retainAll(updatedBookieSet); + mostRecentlyReportedBookies = updatedBookieSet; + } + + @SuppressWarnings("unchecked") + public Collection<BookieSocketAddress> getPartialScanTargets() { + return CollectionUtils.subtract(mostRecentlyReportedBookies, infoMap.keySet()); + } + + public Collection<BookieSocketAddress> getFullScanTargets() { + return mostRecentlyReportedBookies; + } + + /** + * Returns info for bookie, null if not known + * + * @param bookie bookie for which to get info + * @return Info for bookie, null otherwise + */ + public BookieInfo getInfo(BookieSocketAddress bookie) { + return infoMap.get(bookie); + } + + /** + * Removes bookie from bookieInfoMap + * + * @param bookie bookie on which we observed an error + */ + public void clearInfo(BookieSocketAddress bookie) { + infoMap.remove(bookie); + } + + /** + * Report new info on bookie + * + * @param bookie bookie for which we obtained new info + * @param info the new info + */ + public void gotInfo(BookieSocketAddress bookie, BookieInfo info) { + infoMap.put(bookie, info); + } + + /** + * Get bookie info map + */ + public Map<BookieSocketAddress, BookieInfo> getBookieMap() { + return infoMap; + } + } + private final BookieInfoMap bookieInfoMap = new BookieInfoMap(); + + /** + * Tracks whether there is an execution in progress as well as whether + * another is pending. + */ + static final int UNQUEUED = 0; + static final int PARTIAL = 1; + static final int FULL = 2; + private static class InstanceState { + private boolean running = false; + private int queuedType = UNQUEUED; + + private boolean shouldStart() { + if (running) { + return false; + } else { + running = true; + return true; + } + } + + /** + * Mark pending operation FULL and return true if there is no in-progress operation + * + * @return True if we should execute a scan, False if there is already one running + */ + public boolean tryStartFull() { + queuedType = FULL; + return shouldStart(); + } + + /** + * Mark pending operation PARTIAL if not full and return true if there is no in-progress operation + * + * @return True if we should execute a scan, False if there is already one running + */ + public boolean tryStartPartial() { + if (queuedType == UNQUEUED) { + queuedType = PARTIAL; + } + return shouldStart(); + } + + /** + * Gets and clears queuedType + */ + public int getAndClearQueuedType() { + int ret = queuedType; + queuedType = UNQUEUED; + return ret; + } + + /** + * If queuedType != UNQUEUED, returns true, leaves running equal to true + * Otherwise, returns false and sets running to false + */ + public boolean completeUnlessQueued() { + if (queuedType == UNQUEUED) { + running = false; + return false; + } else { + return true; + } + } + } + private final InstanceState instanceState = new InstanceState(); + BookieInfoReader(BookKeeper bk, - ClientConfiguration conf, - ScheduledExecutorService scheduler) { + ClientConfiguration conf, + ScheduledExecutorService scheduler) { this.bk = bk; this.conf = conf; this.scheduler = scheduler; } - void start() { + + public void start() { scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Running periodic BookieInfo scan"); + synchronized (BookieInfoReader.this) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running periodic BookieInfo scan"); + } + try { + Collection<BookieSocketAddress> updatedBookies = bk.bookieWatcher.getBookies(); + bookieInfoMap.updateBookies(updatedBookies); + } catch (BKException e) { + LOG.info("Got exception {} while querying bookies from watcher, rerunning after {}s", Review comment: any reason the put exception as the first parameter? it is a common practice in slf4j to put exception as the last parameter, and you don't need '{}' for exceptions. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services