This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new c90a2a2  Parallelize TransactionImpl.readUnread() (#1080)
c90a2a2 is described below

commit c90a2a2ce843e685da4fb8e32224be97c8bbc6ee
Author: Joseph Koshakow <[email protected]>
AuthorDate: Wed Oct 30 09:30:19 2019 -0400

    Parallelize TransactionImpl.readUnread() (#1080)
    
    When a transaction only writes to a row+col and then has a collision
    Fluo will read the row+col after the collision to look for orphaned
    locks. This commit parallelizes this behavior by reading all row+cols
    at once.
    
    This commit accomplishes this by using a ParallelSnapshotScanner
    instead of a SnapshotScanner.
    
    Fixes #948
---
 .../fluo/core/impl/ParallelSnapshotScanner.java    | 10 ++-
 .../org/apache/fluo/core/impl/SnapshotScanner.java |  8 +--
 .../org/apache/fluo/core/impl/TransactionImpl.java | 83 +++++++++++-----------
 3 files changed, 52 insertions(+), 49 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
 
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 9847580..4d2b37c 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.apache.accumulo.core.client.BatchScanner;
@@ -53,9 +54,11 @@ public class ParallelSnapshotScanner {
   private Function<ByteSequence, Bytes> rowConverter;
   private Function<Key, Column> columnConverter;
   private Map<Bytes, Set<Column>> readLocksSeen;
+  private Consumer<Entry<Key, Value>> writeLocksSeen;
 
   ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, 
Environment env,
-      long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen) {
+      long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen,
+      Consumer<Entry<Key, Value>> writeLocksSeen) {
     this.rows = rows;
     this.columns = columns;
     this.env = env;
@@ -64,10 +67,11 @@ public class ParallelSnapshotScanner {
     this.rowConverter = new CachedBytesConverter(rows);
     this.columnConverter = new CachedColumnConverter(columns);
     this.readLocksSeen = readLocksSeen;
+    this.writeLocksSeen = writeLocksSeen;
   }
 
   ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long 
startTs, TxStats stats,
-      Map<Bytes, Set<Column>> readLocksSeen) {
+      Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> 
writeLocksSeen) {
     for (RowColumn rc : cells) {
       byte[] r = rc.getRow().toArray();
       byte[] cf = rc.getColumn().getFamily().toArray();
@@ -87,6 +91,7 @@ public class ParallelSnapshotScanner {
     this.rowConverter = ByteUtil::toBytes;
     this.columnConverter = ColumnUtil::convert;
     this.readLocksSeen = readLocksSeen;
+    this.writeLocksSeen = writeLocksSeen;
   }
 
   private BatchScanner setupBatchScanner() {
@@ -180,6 +185,7 @@ public class ParallelSnapshotScanner {
         switch (colType) {
           case LOCK:
             locks.add(entry);
+            writeLocksSeen.accept(entry);
             break;
           case DATA:
             ret.computeIfAbsent(row, k -> new HashMap<>()).put(col,
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index f8f83f7..8397c15 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -75,7 +75,6 @@ public class SnapshotScanner implements Iterable<Entry<Key, 
Value>> {
   private final Environment env;
   private final TxStats stats;
   private final Opts config;
-  private Consumer<Entry<Key, Value>> locksSeen;
 
   static final long INITIAL_WAIT_TIME = 50;
   // TODO make configurable
@@ -155,8 +154,6 @@ public class SnapshotScanner implements Iterable<Entry<Key, 
Value>> {
 
       // read ahead a little bit looking for other locks to resolve
 
-      locksSeen.accept(lockEntry);
-
       long startTime = System.currentTimeMillis();
       long waitTime = INITIAL_WAIT_TIME;
 
@@ -174,7 +171,6 @@ public class SnapshotScanner implements Iterable<Entry<Key, 
Value>> {
 
           if (ColumnType.from(entry.getKey()) == ColumnType.LOCK) {
             locks.add(entry);
-            locksSeen.accept(lockEntry);
           }
 
           amountRead += entry.getKey().getSize() + entry.getValue().getSize();
@@ -241,13 +237,11 @@ public class SnapshotScanner implements 
Iterable<Entry<Key, Value>> {
     }
   }
 
-  SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats,
-      Consumer<Entry<Key, Value>> locksSeen) {
+  SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats) {
     this.env = env;
     this.config = config;
     this.startTs = startTs;
     this.stats = stats;
-    this.locksSeen = locksSeen;
   }
 
   @Override
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index b898492..8201a12 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -187,8 +187,7 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
   @Override
   public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
     checkIfOpen();
-    return getImpl(row, columns, kve -> {
-    });
+    return getImpl(row, columns);
   }
 
   @Override
@@ -202,7 +201,8 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     env.getSharedResources().getVisCache().validate(columns);
 
     ParallelSnapshotScanner pss =
-        new ParallelSnapshotScanner(rows, columns, env, startTs, stats, 
readLocksSeen);
+        new ParallelSnapshotScanner(rows, columns, env, startTs, stats, 
readLocksSeen, kve -> {
+        });
 
     Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
 
@@ -216,29 +216,11 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
   @Override
   public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
     checkIfOpen();
-
-    if (rowColumns.isEmpty()) {
-      return Collections.emptyMap();
-    }
-
-    ParallelSnapshotScanner pss =
-        new ParallelSnapshotScanner(rowColumns, env, startTs, stats, 
readLocksSeen);
-
-    Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
-    Map<RowColumn, Bytes> ret = new HashMap<>();
-
-    for (Entry<Bytes, Map<Column, Bytes>> entry : scan.entrySet()) {
-      updateColumnsRead(entry.getKey(), entry.getValue().keySet());
-      for (Entry<Column, Bytes> colVal : entry.getValue().entrySet()) {
-        ret.put(new RowColumn(entry.getKey(), colVal.getKey()), 
colVal.getValue());
-      }
-    }
-
-    return ret;
+    return getImpl(rowColumns, kve -> {
+    });
   }
 
-  private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns,
-      Consumer<Entry<Key, Value>> locksSeen) {
+  private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
 
     // TODO push visibility filtering to server side?
 
@@ -270,7 +252,7 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     Map<Column, Bytes> ret = new HashMap<>();
     Set<Column> readLockCols = null;
 
-    for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, 
stats, locksSeen)) {
+    for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, 
stats)) {
 
       Column col = ColumnUtil.convert(kve.getKey());
       if (shouldCopy && !columns.contains(col)) {
@@ -293,6 +275,28 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     return ret;
   }
 
+  private Map<RowColumn, Bytes> getImpl(Collection<RowColumn> rowColumns,
+      Consumer<Entry<Key, Value>> writeLocksSeen) {
+    if (rowColumns.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    ParallelSnapshotScanner pss =
+        new ParallelSnapshotScanner(rowColumns, env, startTs, stats, 
readLocksSeen, writeLocksSeen);
+
+    Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
+    Map<RowColumn, Bytes> ret = new HashMap<>();
+
+    for (Entry<Bytes, Map<Column, Bytes>> entry : scan.entrySet()) {
+      updateColumnsRead(entry.getKey(), entry.getValue().keySet());
+      for (Entry<Column, Bytes> colVal : entry.getValue().entrySet()) {
+        ret.put(new RowColumn(entry.getKey(), colVal.getKey()), 
colVal.getValue());
+      }
+    }
+
+    return ret;
+  }
+
   @Override
   public ScannerBuilder scanner() {
     checkIfOpen();
@@ -542,7 +546,7 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
    * This function helps handle the following case
    *
    * <OL>
-   * <LI>TX1 locls r1 col1
+   * <LI>TX1 locks r1 col1
    * <LI>TX1 fails before unlocking
    * <LI>TX2 attempts to write r1:col1 w/o reading it
    * </OL>
@@ -554,28 +558,29 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
    *
    * @param cd Commit data
    */
-  private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> 
locksSeen) {
-    // TODO make async
+  private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> 
writeLocksSeen) {
     // TODO need to keep track of ranges read (not ranges passed in, but 
actual data read... user
     // may not iterate over entire range
-    Map<Bytes, Set<Column>> columnsToRead = new HashMap<>();
+    Collection<RowColumn> rowColumnsToRead = new ArrayList<>();
 
     for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {
       Set<Column> rowColsRead = columnsRead.get(entry.getKey());
       if (rowColsRead == null) {
-        columnsToRead.put(entry.getKey(), entry.getValue());
+        for (Column column : entry.getValue()) {
+          rowColumnsToRead.add(new RowColumn(entry.getKey(), column));
+        }
       } else {
         HashSet<Column> colsToRead = new HashSet<>(entry.getValue());
         colsToRead.removeAll(rowColsRead);
         if (!colsToRead.isEmpty()) {
-          columnsToRead.put(entry.getKey(), colsToRead);
+          for (Column column : colsToRead) {
+            rowColumnsToRead.add(new RowColumn(entry.getKey(), column));
+          }
         }
       }
     }
 
-    for (Entry<Bytes, Set<Column>> entry : columnsToRead.entrySet()) {
-      getImpl(entry.getKey(), entry.getValue(), locksSeen);
-    }
+    getImpl(rowColumnsToRead, writeLocksSeen);
   }
 
   private void checkForOrphanedReadLocks(CommitData cd, Map<Bytes, 
Set<Column>> locksResolved)
@@ -642,15 +647,15 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
 
   private void checkForOrphanedLocks(CommitData cd) throws Exception {
 
-    Map<Bytes, Set<Column>> locksSeen = new HashMap<>();
+    Map<Bytes, Set<Column>> writeLocksSeen = new HashMap<>();
 
     readUnread(cd, kve -> {
       Bytes row = ByteUtil.toBytes(kve.getKey().getRowData());
       Column col = ColumnUtil.convert(kve.getKey());
-      locksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
+      writeLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
     });
 
-    checkForOrphanedReadLocks(cd, locksSeen);
+    checkForOrphanedReadLocks(cd, writeLocksSeen);
   }
 
   private boolean checkForAckCollision(ConditionalMutation cm) {
@@ -1532,8 +1537,6 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
   }
 
   public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> 
columns) {
-    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, 
false), startTs, stats,
-        kve -> {
-        });
+    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, 
false), startTs, stats);
   }
 }

Reply via email to