keith-turner commented on code in PR #5484:
URL: https://github.com/apache/accumulo/pull/5484#discussion_r2049130624


##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -202,104 +212,127 @@ private Client(HostAndPort server, 
TabletClientService.Client service) {
       }
     }
 
-    private void sendQueued(int threshhold) {
-      if (queuedDataSize > threshhold || threshhold == 0) {
-        var sendTimer = Timer.startNew();
+    private void sendBackground() {
+      var queue = this.backgroundQueue.get();
+      List<Client> clients = new ArrayList<>();
 
-        List<Client> clients = new ArrayList<>();
-        try {
-
-          // Send load messages to tablet servers spinning up work, but do not 
wait on results.
-          loadQueue.forEach((server, tabletFiles) -> {
+      try {
+        var sendTimer = Timer.startNew();
 
-            if (log.isTraceEnabled()) {
-              log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
-                  tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
-            }
+        // Send load messages to tablet servers spinning up work, but do not 
wait on results.
+        queue.forEach((server, tabletFiles) -> {
 
-            // Tablet servers process tablets serially and perform a single 
metadata table write for
-            // each tablet. Break the work into per-tablet chunks so it can be 
sent over multiple
-            // connections to the tserver, allowing each chunk to be run in 
parallel on the server
-            // side. This allows multiple threads on a single tserver to do 
metadata writes for this
-            // bulk import.
-            int neededConnections = Math.min(maxConnections, 
tabletFiles.size());
-            List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
-                new ArrayList<>(neededConnections);
-            for (int i = 0; i < neededConnections; i++) {
-              chunks.add(new HashMap<>());
-            }
-
-            int nextConnection = 0;
-            for (var entry : tabletFiles.entrySet()) {
-              chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), 
entry.getValue());
-            }
+          if (log.isTraceEnabled()) {
+            log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
+                tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+          }
 
-            for (var chunk : chunks) {
-              try {
-                var client = 
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
-                    manager.getContext(), timeInMillis);
-                // add client to list before calling send in case there is an 
exception, this makes
-                // sure its returned in the finally
-                clients.add(new Client(server, client));
-                client.send_loadFilesV2(TraceUtil.traceInfo(), 
manager.getContext().rpcCreds(), tid,
-                    bulkDir.toString(), chunk, setTime);
-              } catch (TException ex) {
-                log.debug("rpc send failed server: {}, {}", server, fmtTid, 
ex);
-              }
+          // Tablet servers process tablets serially and perform a single 
metadata table write for
+          // each tablet. Break the work into per-tablet chunks so it can be 
sent over multiple
+          // connections to the tserver, allowing each chunk to be run in 
parallel on the server
+          // side. This allows multiple threads on a single tserver to do 
metadata writes for this
+          // bulk import.
+          int neededConnections = Math.min(maxConnections, tabletFiles.size());
+          if (log.isTraceEnabled()) {
+            if (neededConnections == maxConnections) {

Review Comment:
   With the `min` call the equals would do `>=`.  However we probably want `>` 
and not `>=`, so adjusted it to `>` in b6a132c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to