sijie commented on a change in pull request #275: BOOKKEEPER-1102: Fix 
BookKeeeperDiskSpaceWeightedLedgerPlacementTest
URL: https://github.com/apache/bookkeeper/pull/275#discussion_r128890618
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
 ##########
 @@ -80,121 +78,278 @@ public String toString() {
         }
     }
 
+
+    /**
+     * Tracks the most recently reported set of bookies from BookieWatcher as 
well
+     * as current BookieInfo for bookies we've successfully queried.
+     */
+    private static class BookieInfoMap {
+        /**
+         * Contains the most recently obtained information on the contained 
bookies.
+         * When an error happens querying a bookie, the entry is removed.
+         */
+        private final Map<BookieSocketAddress, BookieInfo> infoMap = new 
HashMap<>();
+
+        /**
+         * Contains the most recently reported set of bookies from 
BookieWatcher
+         * A partial query consists of every member of 
mostRecentlyReportedBookies
+         * minus the entries in bookieInfoMap.
+         */
+        private Collection<BookieSocketAddress> mostRecentlyReportedBookies = 
new ArrayList<>();
+
+        public void updateBookies(Collection<BookieSocketAddress> 
updatedBookieSet) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "updateBookies: current: {}, new: {}",
+                        mostRecentlyReportedBookies, updatedBookieSet);
+            }
+            infoMap.keySet().retainAll(updatedBookieSet);
+            mostRecentlyReportedBookies = updatedBookieSet;
+        }
+
+        @SuppressWarnings("unchecked")
+        public Collection<BookieSocketAddress> getPartialScanTargets() {
+            return CollectionUtils.subtract(mostRecentlyReportedBookies, 
infoMap.keySet());
+        }
+
+        public Collection<BookieSocketAddress> getFullScanTargets() {
+            return mostRecentlyReportedBookies;
+        }
+
+        /**
+         * Returns info for bookie, null if not known
+         *
+         * @param bookie bookie for which to get info
+         * @return Info for bookie, null otherwise
+         */
+        public BookieInfo getInfo(BookieSocketAddress bookie) {
+            return infoMap.get(bookie);
+        }
+
+        /**
+         * Removes bookie from bookieInfoMap
+         *
+         * @param bookie bookie on which we observed an error
+         */
+        public void clearInfo(BookieSocketAddress bookie) {
+            infoMap.remove(bookie);
+        }
+
+        /**
+         * Report new info on bookie
+         *
+         * @param bookie bookie for which we obtained new info
+         * @param info the new info
+         */
+        public void gotInfo(BookieSocketAddress bookie, BookieInfo info) {
+            infoMap.put(bookie, info);
+        }
+
+        /**
+         * Get bookie info map
+         */
+        public Map<BookieSocketAddress, BookieInfo> getBookieMap() {
+            return infoMap;
+        }
+    }
+    private final BookieInfoMap bookieInfoMap = new BookieInfoMap();
+
+    /**
+     * Tracks whether there is an execution in progress as well as whether
+     * another is pending.
+     */
+    static final int UNQUEUED = 0;
+    static final int PARTIAL = 1;
+    static final int FULL = 2;
+    private static class InstanceState {
+        private boolean running = false;
+        private int queuedType = UNQUEUED;
+
+        private boolean shouldStart() {
+            if (running) {
+                return false;
+            } else {
+                running = true;
+                return true;
+            }
+        }
+
+        /**
+         * Mark pending operation FULL and return true if there is no 
in-progress operation
+         *
+         * @return True if we should execute a scan, False if there is already 
one running
+         */
+        public boolean tryStartFull() {
+            queuedType = FULL;
+            return shouldStart();
+        }
+
+        /**
+         * Mark pending operation PARTIAL if not full and return true if there 
is no in-progress operation
+         *
+         * @return True if we should execute a scan, False if there is already 
one running
+         */
+        public boolean tryStartPartial() {
+            if (queuedType == UNQUEUED) {
+                queuedType = PARTIAL;
+            }
+            return shouldStart();
+        }
+
+        /**
+         * Gets and clears queuedType
+         */
+        public int getAndClearQueuedType() {
+            int ret = queuedType;
+            queuedType = UNQUEUED;
+            return ret;
+        }
+
+        /**
+         * If queuedType != UNQUEUED, returns true, leaves running equal to 
true
+         * Otherwise, returns false and sets running to false
+         */
+        public boolean completeUnlessQueued() {
+            if (queuedType == UNQUEUED) {
+                running = false;
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+    private final InstanceState instanceState = new InstanceState();
+
     BookieInfoReader(BookKeeper bk,
-                          ClientConfiguration conf,
-                          ScheduledExecutorService scheduler) {
+                     ClientConfiguration conf,
+                     ScheduledExecutorService scheduler) {
         this.bk = bk;
         this.conf = conf;
         this.scheduler = scheduler;
     }
-    void start() {
+
+    public void start() {
         scheduler.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Running periodic BookieInfo scan");
+                synchronized (BookieInfoReader.this) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Running periodic BookieInfo scan");
+                    }
+                    try {
+                        Collection<BookieSocketAddress> updatedBookies = 
bk.bookieWatcher.getBookies();
+                        bookieInfoMap.updateBookies(updatedBookies);
+                    } catch (BKException e) {
+                        LOG.info("Got exception {} while querying bookies from 
watcher, rerunning after {}s",
 
 Review comment:
   any reason the put exception as the first parameter? it is a common practice 
in slf4j to put exception as the last parameter, and you don't need '{}' for 
exceptions.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to