This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7539679  Added column type enum (#1060)
7539679 is described below

commit 75396795ef672174ecea7dfed07ad5a044e3363b
Author: Keith Turner <[email protected]>
AuthorDate: Thu Nov 8 10:37:21 2018 -0500

    Added column type enum (#1060)
---
 .../apache/fluo/accumulo/format/FluoFormatter.java |  46 ++---
 .../iterators/GarbageCollectionIterator.java       | 196 +++++++++++----------
 .../accumulo/iterators/OpenReadLockIterator.java   |  67 +++----
 .../fluo/accumulo/iterators/PrewriteIterator.java  | 179 ++++++++++---------
 .../accumulo/iterators/RollbackCheckIterator.java  | 102 ++++++-----
 .../fluo/accumulo/iterators/SnapshotIterator.java  | 140 ++++++++-------
 .../iterators/TimestampSkippingIterator.java       |   8 +-
 .../apache/fluo/accumulo/util/ColumnConstants.java |  13 +-
 .../org/apache/fluo/accumulo/util/ColumnType.java  |  90 ++++++++++
 .../accumulo/iterators/SnapshotIteratorTest.java   |   8 +-
 .../apache/fluo/accumulo/iterators/TestData.java   |  22 +--
 .../apache/fluo/accumulo/util/ColumnTypeTest.java  |  96 ++++++++++
 .../org/apache/fluo/core/impl/LockResolver.java    |  14 +-
 .../fluo/core/impl/ParallelSnapshotScanner.java    |  30 ++--
 .../org/apache/fluo/core/impl/SnapshotScanner.java |  35 ++--
 .../org/apache/fluo/core/impl/TransactionImpl.java |  27 ++-
 .../java/org/apache/fluo/core/impl/TxInfo.java     |  72 ++++----
 .../java/org/apache/fluo/core/util/ColumnUtil.java |  15 +-
 .../apache/fluo/integration/impl/FailureIT.java    |   9 +-
 .../impl/GarbageCollectionIteratorIT.java          |   9 +-
 .../fluo/mapreduce/FluoKeyValueGenerator.java      |  18 +-
 .../fluo/mapreduce/FluoMutationGenerator.java      |  11 +-
 22 files changed, 713 insertions(+), 494 deletions(-)

diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
index d87975a..e202816 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -21,6 +21,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.NotificationUtil;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -88,34 +89,21 @@ public class FluoFormatter {
     } else {
       long ts = key.getTimestamp();
       String type = "";
-
-      if ((ts & ColumnConstants.PREFIX_MASK) == 
ColumnConstants.TX_DONE_PREFIX) {
-        type = "TX_DONE";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == 
ColumnConstants.DEL_LOCK_PREFIX) {
-        type = "DEL_LOCK";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.LOCK_PREFIX) {
-        type = "LOCK";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DATA_PREFIX) {
-        type = "DATA";
+      ColumnType colType = ColumnType.from(ts);
+
+      switch (colType) {
+        case RLOCK:
+          if (ReadLockUtil.isDelete(ts)) {
+            type = "DEL_RLOCK";
+          } else {
+            type = "RLOCK";
+          }
+          ts = ReadLockUtil.decodeTs(ts);
+          break;
+        default:
+          type = colType.toString();
+          break;
       }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.WRITE_PREFIX) {
-        type = "WRITE";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) {
-        type = "ACK";
-      }
-      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
-        if (ReadLockUtil.isDelete(ts)) {
-          type = "DEL_RLOCK";
-        } else {
-          type = "RLOCK";
-        }
-        ts = ReadLockUtil.decodeTs(ts);
-      }
-
 
       StringBuilder sb = new StringBuilder();
 
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index d07f59e..ad4a8aa 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -125,10 +126,10 @@ public class GarbageCollectionIterator implements 
SortedKeyValueIterator<Key, Va
   private boolean consumeData() throws IOException {
     while (source.hasTop()
         && curCol.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-      long colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.DATA_PREFIX) {
+      if (colType == ColumnType.DATA) {
         if (ts >= truncationTime && !rolledback.contains(ts)) {
           return false;
         }
@@ -173,132 +174,147 @@ public class GarbageCollectionIterator implements 
SortedKeyValueIterator<Key, Va
       return;
     }
 
-    while (source.hasTop()
+    loop: while (source.hasTop()
         && curCol.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
 
-      long colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        keys.add(source.getTopKey(), source.getTopValue());
-        completeTxs.add(ts);
-      } else if (colType == ColumnConstants.WRITE_PREFIX) {
-        boolean keep = false;
-        boolean complete = completeTxs.contains(ts);
-        byte[] val = source.getTopValue().get();
-        long timePtr = WriteValue.getTimestamp(val);
-
-        if (WriteValue.isPrimary(val) && !complete) {
-          keep = true;
+      switch (colType) {
+        case TX_DONE: {
+          keys.add(source.getTopKey(), source.getTopValue());
+          completeTxs.add(ts);
+          break;
         }
+        case WRITE: {
+          boolean keep = false;
+          boolean complete = completeTxs.contains(ts);
+          byte[] val = source.getTopValue().get();
+          long timePtr = WriteValue.getTimestamp(val);
 
-        if (!oldestSeen) {
-          if (firstWrite == -1) {
-            firstWrite = ts;
+          if (WriteValue.isPrimary(val) && !complete) {
+            keep = true;
           }
 
-          if (ts < gcTimestamp) {
-            oldestSeen = true;
-            truncationTime = timePtr;
-            if (!(WriteValue.isDelete(val) && isFullMajc)) {
+          if (!oldestSeen) {
+            if (firstWrite == -1) {
+              firstWrite = ts;
+            }
+
+            if (ts < gcTimestamp) {
+              oldestSeen = true;
+              truncationTime = timePtr;
+              if (!(WriteValue.isDelete(val) && isFullMajc)) {
+                keep = true;
+              }
+            } else {
               keep = true;
             }
-          } else {
-            keep = true;
           }
-        }
 
-        if (timePtr > invalidationTime) {
-          invalidationTime = timePtr;
-        }
+          if (timePtr > invalidationTime) {
+            invalidationTime = timePtr;
+          }
 
-        if (keep) {
-          keys.add(source.getTopKey(), val);
-        } else if (complete) {
-          completeTxs.remove(ts);
+          if (keep) {
+            keys.add(source.getTopKey(), val);
+          } else if (complete) {
+            completeTxs.remove(ts);
+          }
+          break;
         }
-      } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-        boolean keep = false;
-        long txDoneTs = 
DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
-        boolean complete = completeTxs.contains(txDoneTs);
+        case DEL_LOCK: {
+          boolean keep = false;
+          long txDoneTs = 
DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
+          boolean complete = completeTxs.contains(txDoneTs);
 
-        byte[] val = source.getTopValue().get();
+          byte[] val = source.getTopValue().get();
 
-        if (!complete && DelLockValue.isPrimary(val)) {
-          keep = true;
-        }
+          if (!complete && DelLockValue.isPrimary(val)) {
+            keep = true;
+          }
 
-        if (DelLockValue.isRollback(val)) {
-          rolledback.add(ts);
-          keep |= !isFullMajc;
-        }
+          if (DelLockValue.isRollback(val)) {
+            rolledback.add(ts);
+            keep |= !isFullMajc;
+          }
 
-        if (ts > invalidationTime) {
-          invalidationTime = ts;
-        }
+          if (ts > invalidationTime) {
+            invalidationTime = ts;
+          }
 
-        if (keep) {
-          keys.add(source.getTopKey(), source.getTopValue());
-        } else if (complete) {
-          completeTxs.remove(txDoneTs);
+          if (keep) {
+            keys.add(source.getTopKey(), source.getTopValue());
+          } else if (complete) {
+            completeTxs.remove(txDoneTs);
+          }
+          break;
         }
-      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-        boolean keep = false;
-        long rlts = ReadLockUtil.decodeTs(ts);
-        boolean isDelete = ReadLockUtil.isDelete(ts);
+        case RLOCK: {
+          boolean keep = false;
+          long rlts = ReadLockUtil.decodeTs(ts);
+          boolean isDelete = ReadLockUtil.isDelete(ts);
 
-        if (isDelete) {
-          lastReadLockDeleteTs = rlts;
-        }
+          if (isDelete) {
+            lastReadLockDeleteTs = rlts;
+          }
 
-        if (rlts > invalidationTime) {
-          if (isFullMajc) {
-            if (isDelete) {
-              if (DelReadLockValue.isRollback(source.getTopValue().get())) {
-                // can drop rolled back read lock delete markers on any full 
majc, do not need to
-                // consider gcTimestamp
-                keep = false;
+          if (rlts > invalidationTime) {
+            if (isFullMajc) {
+              if (isDelete) {
+                if (DelReadLockValue.isRollback(source.getTopValue().get())) {
+                  // can drop rolled back read lock delete markers on any full 
majc, do not need to
+                  // consider gcTimestamp
+                  keep = false;
+                } else {
+                  long rlockCommitTs =
+                      
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+                  keep = rlockCommitTs >= gcTimestamp;
+                }
               } else {
-                long rlockCommitTs =
-                    
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
-                keep = rlockCommitTs >= gcTimestamp;
+                keep = lastReadLockDeleteTs != rlts;
               }
             } else {
-              keep = lastReadLockDeleteTs != rlts;
+              // can drop deleted read lock entries.. keep the delete entry.
+              keep = isDelete || lastReadLockDeleteTs != rlts;
             }
-          } else {
-            // can drop deleted read lock entries.. keep the delete entry.
-            keep = isDelete || lastReadLockDeleteTs != rlts;
           }
-        }
 
-        if (keep) {
-          keys.add(source.getTopKey(), source.getTopValue());
-        }
-      } else if (colType == ColumnConstants.LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          keys.add(source.getTopKey(), source.getTopValue());
+          if (keep) {
+            keys.add(source.getTopKey(), source.getTopValue());
+          }
+          break;
         }
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        // can stop looking
-        break;
-      } else if (colType == ColumnConstants.ACK_PREFIX) {
-        if (!sawAck) {
-          if (ts >= firstWrite) {
+        case LOCK: {
+          if (ts > invalidationTime) {
             keys.add(source.getTopKey(), source.getTopValue());
           }
-          sawAck = true;
+          break;
         }
-      } else {
-        throw new IllegalArgumentException(" unknown colType " + 
String.format("%x", colType));
+        case DATA: {
+          // can stop looking
+          break loop;
+        }
+        case ACK: {
+          if (!sawAck) {
+            if (ts >= firstWrite) {
+              keys.add(source.getTopKey(), source.getTopValue());
+            }
+            sawAck = true;
+          }
+          break;
+        }
+
+        default:
+          throw new IllegalArgumentException(" unknown colType " + colType);
+
       }
 
       source.next();
     }
 
     keys.copyTo(keysFiltered, (timestamp -> {
-      long colType = timestamp & ColumnConstants.PREFIX_MASK;
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
+      if (ColumnType.from(timestamp) == ColumnType.TX_DONE) {
         return completeTxs.contains(timestamp & 
ColumnConstants.TIMESTAMP_MASK);
       } else {
         return true;
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
index dd4de54..167eff7 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -26,17 +26,10 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 
-import static org.apache.fluo.accumulo.util.ColumnConstants.ACK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.DATA_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.DEL_LOCK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.LOCK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
 import static org.apache.fluo.accumulo.util.ColumnConstants.TIMESTAMP_MASK;
-import static org.apache.fluo.accumulo.util.ColumnConstants.TX_DONE_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.WRITE_PREFIX;
 
 public class OpenReadLockIterator implements SortedKeyValueIterator<Key, 
Value> {
 
@@ -47,35 +40,43 @@ public class OpenReadLockIterator implements 
SortedKeyValueIterator<Key, Value>
   private void findTop() throws IOException {
     while (source.hasTop()) {
 
-      long colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
 
-      if (colType == TX_DONE_PREFIX || colType == WRITE_PREFIX || colType == 
DEL_LOCK_PREFIX) {
-        source.skipToPrefix(source.getTopKey(), RLOCK_PREFIX);
-        continue;
-      } else if (colType == RLOCK_PREFIX) {
-        if (ReadLockUtil.isDelete(source.getTopKey())) {
-          lastDelete.set(source.getTopKey());
-        } else {
-          if (lastDelete.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-            long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() 
& TIMESTAMP_MASK);
-            long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & 
TIMESTAMP_MASK);
-
-            if (ts1 != ts2) {
+      switch (colType) {
+        case TX_DONE:
+        case WRITE:
+        case DEL_LOCK: {
+          source.skipToPrefix(source.getTopKey(), ColumnType.RLOCK);
+          break;
+        }
+        case RLOCK: {
+          if (ReadLockUtil.isDelete(source.getTopKey())) {
+            lastDelete.set(source.getTopKey());
+          } else {
+            if (lastDelete.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+              long ts1 = 
ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK);
+              long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & 
TIMESTAMP_MASK);
+
+              if (ts1 != ts2) {
+                // found a read lock that is not suppressed by a delete read 
lock entry
+                return;
+              }
+            } else {
               // found a read lock that is not suppressed by a delete read 
lock entry
               return;
             }
-          } else {
-            // found a read lock that is not suppressed by a delete read lock 
entry
-            return;
           }
+          source.next();
+          break;
+        }
+        case DATA:
+        case LOCK:
+        case ACK: {
+          source.skipColumn(source.getTopKey());
+          break;
         }
-        source.next();
-        continue;
-      } else if (colType == DATA_PREFIX || colType == LOCK_PREFIX || colType 
== ACK_PREFIX) {
-        source.skipColumn(source.getTopKey());
-        continue;
-      } else {
-        throw new IllegalArgumentException("Unknown column type " + 
source.getTopKey());
+        default:
+          throw new IllegalArgumentException("Unknown column type " + 
source.getTopKey());
       }
     }
   }
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
index b6f9a48..f6de3f8 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
@@ -102,9 +103,9 @@ public class PrewriteIterator implements 
SortedKeyValueIterator<Key, Value> {
 
     Key endKey = new Key(range.getStartKey());
     if (checkAck) {
-      endKey.setTimestamp(ColumnConstants.DATA_PREFIX | 
ColumnConstants.TIMESTAMP_MASK);
+      endKey.setTimestamp(ColumnType.DATA.first());
     } else {
-      endKey.setTimestamp(ColumnConstants.ACK_PREFIX | 
ColumnConstants.TIMESTAMP_MASK);
+      endKey.setTimestamp(ColumnType.ACK.first());
     }
 
     // Tried seeking directly to WRITE_PREFIX, however this did not work well 
because of how
@@ -120,114 +121,126 @@ public class PrewriteIterator implements 
SortedKeyValueIterator<Key, Value> {
     while (source.hasTop() && 
seekRange.getStartKey().equals(source.getTopKey(),
         PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
 
-      long colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        // tried to make 1st seek go to WRITE_PREFIX, but this did not allow 
the DeleteIterator to
-        // be removed from the stack so it was slower.
-        source.skipToPrefix(seekRange.getStartKey(), 
ColumnConstants.WRITE_PREFIX);
-      } else if (colType == ColumnConstants.WRITE_PREFIX) {
-        long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
-        if (timePtr > invalidationTime) {
-          invalidationTime = timePtr;
+      switch (colType) {
+        case TX_DONE: {
+          // tried to make 1st seek go to WRITE_PREFIX, but this did not allow 
the DeleteIterator to
+          // be removed from the stack so it was slower.
+          source.skipToPrefix(seekRange.getStartKey(), ColumnType.WRITE);
+          break;
         }
+        case WRITE: {
+          long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
 
-        if (ts >= snaptime) {
-          hasTop = true;
-          return;
-        }
-
-        source.skipToPrefix(seekRange.getStartKey(), 
ColumnConstants.DEL_LOCK_PREFIX);
-      } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          invalidationTime = ts;
+          if (timePtr > invalidationTime) {
+            invalidationTime = timePtr;
+          }
 
           if (ts >= snaptime) {
             hasTop = true;
             return;
           }
-        }
 
-        if (readlock) {
-          source.skipToPrefix(seekRange.getStartKey(), 
ColumnConstants.LOCK_PREFIX);
-        } else {
-          source.skipToPrefix(seekRange.getStartKey(), 
ColumnConstants.RLOCK_PREFIX);
+          source.skipToPrefix(seekRange.getStartKey(), ColumnType.DEL_LOCK);
+          break;
         }
-      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+        case DEL_LOCK: {
+          if (ts > invalidationTime) {
+            invalidationTime = ts;
 
-        long lastDeleteTs = -1;
-        long rlts = ReadLockUtil.decodeTs(ts);
+            if (ts >= snaptime) {
+              hasTop = true;
+              return;
+            }
+          }
 
-        if (!readlock) {
-          while (rlts > invalidationTime && colType == 
ColumnConstants.RLOCK_PREFIX) {
-            if (ReadLockUtil.isDelete(ts)) {
-              // ignore rolled back read locks, these should never prevent a 
write lock
-              if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
-                if (rlts >= snaptime) {
-                  hasTop = true;
-                  return;
-                } else {
-                  long rlockCommitTs =
-                      
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
-                  if (rlockCommitTs > snaptime) {
+          if (readlock) {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK);
+          } else {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.RLOCK);
+          }
+          break;
+        }
+        case RLOCK: {
+          long lastDeleteTs = -1;
+          long rlts = ReadLockUtil.decodeTs(ts);
+
+          if (!readlock) {
+            while (rlts > invalidationTime && colType == ColumnType.RLOCK) {
+              if (ReadLockUtil.isDelete(ts)) {
+                // ignore rolled back read locks, these should never prevent a 
write lock
+                if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
+                  if (rlts >= snaptime) {
                     hasTop = true;
                     return;
+                  } else {
+                    long rlockCommitTs =
+                        
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+                    if (rlockCommitTs > snaptime) {
+                      hasTop = true;
+                      return;
+                    }
                   }
                 }
-              }
 
 
-              lastDeleteTs = rlts;
-            } else {
-              if (rlts != lastDeleteTs) {
-                // this read lock is active
-                hasTop = true;
-                return;
+                lastDeleteTs = rlts;
+              } else {
+                if (rlts != lastDeleteTs) {
+                  // this read lock is active
+                  hasTop = true;
+                  return;
+                }
+              }
+
+              source.next();
+              if (source.hasTop()) {
+                colType = ColumnType.from(source.getTopKey());
+                ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
+                rlts = ReadLockUtil.decodeTs(ts);
+              } else {
+                break;
               }
             }
+          }
 
-            source.next();
-            if (source.hasTop()) {
-              colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
-              ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
-              rlts = ReadLockUtil.decodeTs(ts);
-            } else {
-              break;
-            }
+          if (source.hasTop() && (colType == ColumnType.RLOCK)) {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK);
           }
+          break;
         }
+        case LOCK: {
+          if (ts > invalidationTime) {
+            // nothing supersedes this lock, therefore the column is locked
+            hasTop = true;
+            return;
+          }
 
-        if (source.hasTop() && (colType == ColumnConstants.RLOCK_PREFIX)) {
-          source.skipToPrefix(seekRange.getStartKey(), 
ColumnConstants.LOCK_PREFIX);
-        }
-      } else if (colType == ColumnConstants.LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          // nothing supersedes this lock, therefore the column is locked
-          hasTop = true;
-          return;
+          if (checkAck) {
+            source.skipToPrefix(seekRange.getStartKey(), ColumnType.ACK);
+          } else {
+            // only ack and data left and not interested in either so stop 
looking
+            return;
+          }
+          break;
         }
-
-        if (checkAck) {
-          source.skipToPrefix(seekRange.getStartKey(), 
ColumnConstants.ACK_PREFIX);
-        } else {
-          // only ack and data left and not interested in either so stop 
looking
+        case DATA: {
+          // can stop looking
           return;
         }
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        // can stop looking
-        return;
-      } else if (colType == ColumnConstants.ACK_PREFIX) {
-        if (checkAck && ts > ntfyTimestamp) {
-          hasTop = true;
-          return;
-        } else {
-          // nothing else to look at in this column
-          return;
+        case ACK: {
+          if (checkAck && ts > ntfyTimestamp) {
+            hasTop = true;
+            return;
+          } else {
+            // nothing else to look at in this column
+            return;
+          }
         }
-      } else {
-        throw new IllegalArgumentException();
+        default:
+          throw new IllegalArgumentException();
       }
     }
   }
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
index cae383f..7df373f 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 
 public class RollbackCheckIterator implements SortedKeyValueIterator<Key, 
Value> {
@@ -91,60 +92,69 @@ public class RollbackCheckIterator implements 
SortedKeyValueIterator<Key, Value>
     hasTop = false;
     while (source.hasTop()
         && curCol.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-      long colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(source.getTopKey());
       long ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
-        continue;
-      } else if (colType == ColumnConstants.WRITE_PREFIX) {
-        long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
-        if (timePtr > invalidationTime) {
-          invalidationTime = timePtr;
+      switch (colType) {
+        case TX_DONE:
+          source.skipToPrefix(curCol, ColumnType.WRITE);
+          continue;
+        case WRITE: {
+          long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
+
+          if (timePtr > invalidationTime) {
+            invalidationTime = timePtr;
+          }
+
+          if (lockTime == timePtr) {
+            hasTop = true;
+            return;
+          }
+
+          if (lockTime > timePtr) {
+            source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
+            continue;
+          }
+          break;
         }
-
-        if (lockTime == timePtr) {
-          hasTop = true;
-          return;
+        case DEL_LOCK: {
+          if (ts > invalidationTime) {
+            invalidationTime = ts;
+          }
+
+          if (ts == lockTime) {
+            hasTop = true;
+            return;
+          }
+
+          if (lockTime > ts) {
+            source.skipToPrefix(curCol, ColumnType.LOCK);
+            continue;
+          }
+          break;
         }
-
-        if (lockTime > timePtr) {
-          source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
+        case RLOCK: {
+          source.skipToPrefix(curCol, ColumnType.LOCK);
           continue;
         }
-
-      } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          invalidationTime = ts;
+        case LOCK: {
+          if (ts > invalidationTime) {
+            // nothing supersedes this lock, therefore the column is locked
+            hasTop = true;
+            return;
+          }
+          break;
         }
-
-        if (ts == lockTime) {
-          hasTop = true;
+        case DATA: {
+          // can stop looking
           return;
         }
-
-        if (lockTime > ts) {
-          source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
-          continue;
-        }
-
-      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-        source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
-        continue;
-      } else if (colType == ColumnConstants.LOCK_PREFIX) {
-        if (ts > invalidationTime) {
-          // nothing supersedes this lock, therefore the column is locked
-          hasTop = true;
-          return;
+        case ACK: {
+          // do nothing if ACK
+          break;
         }
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        // can stop looking
-        return;
-      } else if (colType == ColumnConstants.ACK_PREFIX) {
-        // do nothing if ACK
-      } else {
-        throw new IllegalArgumentException();
+        default:
+          throw new IllegalArgumentException();
       }
 
       source.next();
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
index 27e63ef..4b5ab9d 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 
 public class SnapshotIterator implements SortedKeyValueIterator<Key, Value> {
@@ -91,87 +92,95 @@ public class SnapshotIterator implements 
SortedKeyValueIterator<Key, Value> {
 
       while (source.hasTop()
           && curCol.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
-        long colType = source.getTopKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+        ColumnType colType = ColumnType.from(source.getTopKey());
         long ts = source.getTopKey().getTimestamp() & 
ColumnConstants.TIMESTAMP_MASK;
 
-        if (colType == ColumnConstants.TX_DONE_PREFIX) {
-          source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
-          continue;
-        } else if (colType == ColumnConstants.WRITE_PREFIX) {
-          long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
-          if (timePtr > invalidationTime) {
-            invalidationTime = timePtr;
+        switch (colType) {
+          case TX_DONE: {
+            source.skipToPrefix(curCol, ColumnType.WRITE);
+            continue;
           }
+          case WRITE: {
+            long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
 
-          if (dataPointer == -1) {
-            if (ts <= snaptime) {
-              dataPointer = timePtr;
-              source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
-              continue;
+            if (timePtr > invalidationTime) {
+              invalidationTime = timePtr;
+            }
+
+            if (dataPointer == -1) {
+              if (ts <= snaptime) {
+                dataPointer = timePtr;
+                source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
+                continue;
+              } else {
+                source.skipToTimestamp(curCol, 
ColumnType.WRITE.enode(snaptime));
+                continue;
+              }
+            }
+            break;
+          }
+          case DEL_LOCK: {
+            if (ts > invalidationTime) {
+              invalidationTime = ts;
+            }
+            if (returnReadLockPresent) {
+              source.skipToPrefix(curCol, ColumnType.RLOCK);
             } else {
-              source.skipToTimestamp(curCol, ColumnConstants.WRITE_PREFIX | 
snaptime);
-              continue;
+              source.skipToPrefix(curCol, ColumnType.LOCK);
             }
+            continue;
           }
-        } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-          if (ts > invalidationTime) {
-            invalidationTime = ts;
+          case RLOCK: {
+            if (returnReadLockPresent) {
+              rememberReadLock(source.getTopKey(), source.getTopValue());
+            }
+
+            source.skipToPrefix(curCol, ColumnType.LOCK);
+            continue;
           }
-          if (returnReadLockPresent) {
-            source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX);
-          } else {
-            source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+          case LOCK: {
+            if (ts > invalidationTime && ts <= snaptime) {
+              // nothing supersedes this lock, therefore the column is locked
+              return;
+            } else {
+              if (dataPointer == -1) {
+                source.skipColumn(curCol);
+                continue outer;
+              } else {
+                source.skipToTimestamp(curCol, 
ColumnType.DATA.enode(dataPointer));
+                continue;
+              }
+            }
           }
-          continue;
+          case DATA: {
+            if (dataPointer == ts) {
+              // found data for this column
+              return;
+            }
 
-        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-          if (returnReadLockPresent) {
-            rememberReadLock(source.getTopKey(), source.getTopValue());
-          }
+            if (ts < dataPointer || dataPointer == -1) {
+              source.skipColumn(curCol);
+              continue outer;
+            }
 
-          source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
-          continue;
-        } else if (colType == ColumnConstants.LOCK_PREFIX) {
-          if (ts > invalidationTime && ts <= snaptime) {
-            // nothing supersedes this lock, therefore the column is locked
-            return;
-          } else {
+            if (ts > dataPointer) {
+              source.skipToTimestamp(curCol, 
ColumnType.DATA.enode(dataPointer));
+              continue;
+            }
+            break;
+          }
+          case ACK: {
             if (dataPointer == -1) {
               source.skipColumn(curCol);
               continue outer;
             } else {
-              source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | 
dataPointer);
+              source.skipToTimestamp(curCol, 
ColumnType.DATA.enode(dataPointer));
               continue;
             }
           }
-        } else if (colType == ColumnConstants.DATA_PREFIX) {
-          if (dataPointer == ts) {
-            // found data for this column
-            return;
-          }
-
-          if (ts < dataPointer || dataPointer == -1) {
-            source.skipColumn(curCol);
-            continue outer;
-          }
-
-          if (ts > dataPointer) {
-            source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | 
dataPointer);
-            continue;
-          }
-        } else if (colType == ColumnConstants.ACK_PREFIX) {
-          if (dataPointer == -1) {
-            source.skipColumn(curCol);
-            continue outer;
-          } else {
-            source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | 
dataPointer);
-            continue;
-          }
-        } else {
-          throw new IllegalArgumentException();
+          default:
+            throw new IllegalArgumentException();
         }
-
         // TODO handle case where dataPointer >=0, but no data was found
         source.next();
       }
@@ -220,8 +229,7 @@ public class SnapshotIterator implements 
SortedKeyValueIterator<Key, Value> {
     if (range.getStartKey() != null && range.getStartKey().getTimestamp() != 
Long.MAX_VALUE
         && !range.isStartKeyInclusive()) {
 
-      if ((range.getStartKey().getTimestamp()
-          & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+      if (ColumnType.from(range.getStartKey()) == ColumnType.RLOCK) {
         Key currCol = new Key(range.getStartKey());
         currCol.setTimestamp(Long.MAX_VALUE);
         newRange = new Range(currCol, true, range.getEndKey(), 
range.isEndKeyInclusive());
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
index 3279ec2..534c82d 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 
 /**
  * The purpose of this iterator is to make seeking within a columns timestamp 
range efficient.
@@ -80,10 +80,8 @@ public class TimestampSkippingIterator implements 
SortedKeyValueIterator<Key, Va
     }
   }
 
-  public void skipToPrefix(Key curCol, long prefix) throws IOException {
-    // first possible timestamp in sorted order for this prefix
-    long timestamp = prefix | ColumnConstants.TIMESTAMP_MASK;
-    skipToTimestamp(curCol, timestamp);
+  public void skipToPrefix(Key curCol, ColumnType colType) throws IOException {
+    skipToTimestamp(curCol, colType.first());
   }
 
   public void skipColumn(Key curCol) throws IOException {
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index 7063adc..16b9f1e 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -21,20 +21,11 @@ import org.apache.fluo.api.data.Bytes;
  * Constants used extract data from columns
  */
 public class ColumnConstants {
-
-  public static final long PREFIX_MASK = 0xe000000000000000L;
-  public static final long TX_DONE_PREFIX = 0x6000000000000000L;
-  public static final long WRITE_PREFIX = 0x4000000000000000L;
-  public static final long DEL_LOCK_PREFIX = 0x2000000000000000L;
-  public static final long RLOCK_PREFIX = 0x0000000000000000L;
-  public static final long LOCK_PREFIX = 0xe000000000000000L;
-  public static final long ACK_PREFIX = 0xc000000000000000L;
-  public static final long DATA_PREFIX = 0xa000000000000000L;
-  public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
+  public static final long PREFIX_MASK = -1L << (64 - ColumnType.BITS);
+  public static final long TIMESTAMP_MASK = -1L >>> ColumnType.BITS;
   public static final Bytes NOTIFY_CF = Bytes.of("ntfy");
   public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify";
   public static final Bytes GC_CF = Bytes.of("gc");
 
   private ColumnConstants() {}
-
 }
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
new file mode 100644
index 0000000..b16fe6d
--- /dev/null
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.data.Key;
+
+/**
+ * Abstracts how the Fluo column type is encoded in Accumulo timestamps.
+ */
+public enum ColumnType {
+
+  TX_DONE, WRITE, DEL_LOCK, RLOCK, LOCK, ACK, DATA;
+
+  private long prefix;
+
+  /**
+   * @return The first possible timestamp in sorted order.
+   */
+  public long first() {
+    return prefix | ColumnConstants.TIMESTAMP_MASK;
+  }
+
+  /**
+   * @return The timestamp with this column type encoded into the high order 
bits.
+   */
+  public long enode(long timestamp) {
+    Preconditions.checkArgument((timestamp >>> (64 - BITS)) == 0);
+    return prefix | timestamp;
+  }
+
+  // The number of leftmost bits in in the timestamp reserved for encoding the 
column type
+  static final int BITS = 3;
+  private static final byte TX_DONE_PREFIX = 0x03;
+  private static final byte WRITE_PREFIX = 0x02;
+  private static final byte DEL_LOCK_PREFIX = 0x01;
+  private static final byte RLOCK_PREFIX = 0x00;
+  private static final byte LOCK_PREFIX = 0x07;
+  private static final byte ACK_PREFIX = 0x06;
+  private static final byte DATA_PREFIX = 0x05;
+
+  static {
+    TX_DONE.prefix = (long) TX_DONE_PREFIX << (64 - BITS);
+    WRITE.prefix = (long) WRITE_PREFIX << (64 - BITS);
+    DEL_LOCK.prefix = (long) DEL_LOCK_PREFIX << (64 - BITS);
+    RLOCK.prefix = (long) RLOCK_PREFIX << (64 - BITS);
+    LOCK.prefix = (long) LOCK_PREFIX << (64 - BITS);
+    ACK.prefix = (long) ACK_PREFIX << (64 - BITS);
+    DATA.prefix = (long) DATA_PREFIX << (64 - BITS);
+  }
+
+  public static ColumnType from(Key k) {
+    return from(k.getTimestamp());
+  }
+
+  public static ColumnType from(long timestamp) {
+    byte prefix = (byte) (timestamp >>> (64 - BITS));
+    switch (prefix) {
+      case TX_DONE_PREFIX:
+        return TX_DONE;
+      case WRITE_PREFIX:
+        return WRITE;
+      case DEL_LOCK_PREFIX:
+        return DEL_LOCK;
+      case RLOCK_PREFIX:
+        return RLOCK;
+      case LOCK_PREFIX:
+        return LOCK;
+      case ACK_PREFIX:
+        return ACK;
+      case DATA_PREFIX:
+        return DATA;
+      default:
+        throw new IllegalArgumentException("Unknown prefix : " + 
Integer.toHexString(prefix));
+    }
+  }
+}
diff --git 
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
 
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
index 417fd81..7358e1e 100644
--- 
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
+++ 
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -320,6 +320,6 @@ public class SnapshotIteratorTest {
 
   @Test(expected = IllegalArgumentException.class)
   public void testNonZeroPrefix() {
-    SnapshotIterator.setSnaptime(null, ColumnConstants.DATA_PREFIX | 6);
+    SnapshotIterator.setSnaptime(null, ColumnType.DATA.enode(6));
   }
 }
diff --git 
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
 
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index dbdcd1b..ad595e0 100644
--- 
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++ 
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.accumulo.iterators.CountingIterator.Counter;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -101,28 +101,28 @@ public class TestData {
 
     switch (ct) {
       case "ACK":
-        ts |= ColumnConstants.ACK_PREFIX;
+        ts = ColumnType.ACK.enode(ts);
         break;
       case "TX_DONE":
-        ts |= ColumnConstants.TX_DONE_PREFIX;
+        ts = ColumnType.TX_DONE.enode(ts);
         break;
       case "WRITE":
-        ts |= ColumnConstants.WRITE_PREFIX;
+        ts = ColumnType.WRITE.enode(ts);
         long writeTs = Long.parseLong(value.split("\\s+")[0]);
         val = WriteValue.encode(writeTs, value.contains("PRIMARY"), 
value.contains("DELETE"));
         break;
       case "LOCK":
-        ts |= ColumnConstants.LOCK_PREFIX;
+        ts = ColumnType.LOCK.enode(ts);
         String rc[] = value.split("\\s+");
         val = LockValue.encode(Bytes.of(rc[0]), new Column(rc[1], rc[2]), 
value.contains("WRITE"),
             value.contains("DELETE"), value.contains("TRIGGER"), 42l);
         break;
       case "DATA":
-        ts |= ColumnConstants.DATA_PREFIX;
+        ts = ColumnType.DATA.enode(ts);
         val = value.getBytes();
         break;
       case "DEL_LOCK":
-        ts |= ColumnConstants.DEL_LOCK_PREFIX;
+        ts = ColumnType.DEL_LOCK.enode(ts);
         if (value.contains("ROLLBACK") || value.contains("ABORT")) {
           val = DelLockValue.encodeRollback(value.contains("PRIMARY"), true);
         } else {
@@ -132,11 +132,11 @@ public class TestData {
         break;
       case "RLOCK":
         ts = ReadLockUtil.encodeTs(ts, false);
-        ts |= ColumnConstants.RLOCK_PREFIX;
+        ts = ColumnType.RLOCK.enode(ts);
         break;
       case "DEL_RLOCK":
         ts = ReadLockUtil.encodeTs(ts, true);
-        ts |= ColumnConstants.RLOCK_PREFIX;
+        ts = ColumnType.RLOCK.enode(ts);
 
         if (value.contains("ROLLBACK") || value.contains("ABORT")) {
           val = DelReadLockValue.encodeRollback();
diff --git 
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
 
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
new file mode 100644
index 0000000..3b0b09b
--- /dev/null
+++ 
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import org.apache.accumulo.core.data.Key;
+import org.junit.Test;
+
+import static org.apache.fluo.accumulo.util.ColumnType.ACK;
+import static org.apache.fluo.accumulo.util.ColumnType.DATA;
+import static org.apache.fluo.accumulo.util.ColumnType.DEL_LOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.LOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.RLOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.TX_DONE;
+import static org.apache.fluo.accumulo.util.ColumnType.WRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnTypeTest {
+
+  private static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
+
+
+  // map of expected prefixes for a column type
+  private static final Map<Long, ColumnType> EPM;
+
+  static {
+    Builder<Long, ColumnType> builder = ImmutableMap.builder();
+    builder.put(0x6000000000000000L, TX_DONE);
+    builder.put(0x4000000000000000L, WRITE);
+    builder.put(0x2000000000000000L, DEL_LOCK);
+    builder.put(0x0000000000000000L, RLOCK);
+    builder.put(0xe000000000000000L, LOCK);
+    builder.put(0xc000000000000000L, ACK);
+    builder.put(0xa000000000000000L, DATA);
+    EPM = builder.build();
+  }
+
+  @Test
+  public void testPrefix() {
+    for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 
48}) {
+      EPM.forEach((prefix, colType) -> assertEquals(prefix | l, 
colType.enode(l)));
+    }
+  }
+
+  @Test
+  public void testFirst() {
+    EPM.forEach((prefix, colType) -> assertEquals(prefix | TIMESTAMP_MASK, 
colType.first()));
+    for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 
48}) {
+      EPM.forEach((prefix, colType) -> {
+        Key k1 = new Key("r", "f", "q");
+        k1.setTimestamp(prefix | l);
+        Key k2 = new Key("r", "f", "q");
+        k2.setTimestamp(colType.first());
+        assertTrue(k1.compareTo(k2) > 0);
+      });
+    }
+  }
+
+  @Test
+  public void testFrom() {
+    for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 
48}) {
+      EPM.forEach((prefix, colType) -> {
+        assertEquals(ColumnType.from(prefix | l), colType);
+        Key k = new Key("r", "f", "q");
+        k.setTimestamp(prefix | l);
+        assertEquals(ColumnType.from(k), colType);
+      });
+    }
+  }
+
+  @Test
+  public void testCoverage() {
+    EnumSet<ColumnType> expected = EnumSet.allOf(ColumnType.class);
+    HashSet<ColumnType> actual = new HashSet<>(EPM.values());
+    assertEquals(expected, actual);
+  }
+}
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index ea47bbf..54d2fa0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.OpenReadLockIterator;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -50,7 +51,6 @@ import org.apache.fluo.core.util.ConditionalFlutation;
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.SpanUtil;
 
-import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
 import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 
 /**
@@ -99,7 +99,7 @@ public class LockResolver {
     public LockInfo(Entry<Key, Value> kve) {
       long rawTs = kve.getKey().getTimestamp();
       this.entry = kve;
-      if ((rawTs & ColumnConstants.PREFIX_MASK) == 
ColumnConstants.RLOCK_PREFIX) {
+      if (ColumnType.from(rawTs) == ColumnType.RLOCK) {
         this.lockTs = ReadLockUtil.decodeTs(rawTs);
         ReadLockValue rlv = new ReadLockValue(kve.getValue().get());
         this.prow = rlv.getPrimaryRow();
@@ -221,11 +221,11 @@ public class LockResolver {
       if (lockInfo.isReadLock) {
         mut.put(k.getColumnFamilyData().toArray(), 
k.getColumnQualifierData().toArray(),
             k.getColumnVisibilityParsed(),
-            ColumnConstants.RLOCK_PREFIX | 
ReadLockUtil.encodeTs(lockInfo.lockTs, true),
+            ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(lockInfo.lockTs, 
true)),
             DelReadLockValue.encodeRollback());
       } else {
         mut.put(k.getColumnFamilyData().toArray(), 
k.getColumnQualifierData().toArray(),
-            k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | 
lockInfo.lockTs,
+            k.getColumnVisibilityParsed(), 
ColumnType.DEL_LOCK.enode(lockInfo.lockTs),
             DelLockValue.encodeRollback(false, true));
       }
     }
@@ -241,7 +241,7 @@ public class LockResolver {
     ConditionalFlutation delLockMutation = new ConditionalFlutation(env, 
prc.prow,
         new FluoCondition(env, 
prc.pcol).setIterators(iterConf).setValue(lockValue));
 
-    delLockMutation.put(prc.pcol, ColumnConstants.DEL_LOCK_PREFIX | 
prc.startTs,
+    delLockMutation.put(prc.pcol, ColumnType.DEL_LOCK.enode(prc.startTs),
         DelLockValue.encodeRollback(true, true));
 
     ConditionalWriter cw = null;
@@ -312,7 +312,7 @@ public class LockResolver {
       for (Column col : e1.getValue()) {
         Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col));
         Key end = new Key(start);
-        end.setTimestamp(ColumnConstants.LOCK_PREFIX | 
ColumnConstants.TIMESTAMP_MASK);
+        end.setTimestamp(ColumnType.LOCK.first());
         ranges.add(new Range(start, true, end, false));
       }
     }
@@ -329,7 +329,7 @@ public class LockResolver {
 
       List<Entry<Key, Value>> ret = new ArrayList<>();
       for (Entry<Key, Value> entry : bscanner) {
-        if ((entry.getKey().getTimestamp() & PREFIX_MASK) == 
ColumnConstants.RLOCK_PREFIX) {
+        if (ColumnType.from(entry.getKey()) == ColumnType.RLOCK) {
           ret.add(entry);
         }
       }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
 
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index f9f1e85..37dc45b 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -32,7 +32,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
@@ -176,16 +176,20 @@ public class ParallelSnapshotScanner {
         Bytes row = rowConverter.apply(entry.getKey().getRowData());
         Column col = columnConverter.apply(entry.getKey());
 
-        long colType = entry.getKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
-
-        if (colType == ColumnConstants.LOCK_PREFIX) {
-          locks.add(entry);
-        } else if (colType == ColumnConstants.DATA_PREFIX) {
-          ret.computeIfAbsent(row, k -> new HashMap<>()).put(col, 
Bytes.of(entry.getValue().get()));
-        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-          readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
-        } else {
-          throw new IllegalArgumentException("Unexpected column type " + 
colType);
+        ColumnType colType = ColumnType.from(entry.getKey());
+        switch (colType) {
+          case LOCK:
+            locks.add(entry);
+            break;
+          case DATA:
+            ret.computeIfAbsent(row, k -> new HashMap<>()).put(col,
+                Bytes.of(entry.getValue().get()));
+            break;
+          case RLOCK:
+            readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
+            break;
+          default:
+            throw new IllegalArgumentException("Unexpected column type " + 
colType);
         }
       }
     } finally {
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 5db7602..f8f83f7 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -31,7 +31,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.SnapshotIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
@@ -172,9 +172,7 @@ public class SnapshotScanner implements Iterable<Entry<Key, 
Value>> {
         while (iterator.hasNext()) {
           Entry<Key, Value> entry = iterator.next();
 
-          long colType = entry.getKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
-
-          if (colType == ColumnConstants.LOCK_PREFIX) {
+          if (ColumnType.from(entry.getKey()) == ColumnType.LOCK) {
             locks.add(entry);
             locksSeen.accept(lockEntry);
           }
@@ -220,18 +218,19 @@ public class SnapshotScanner implements 
Iterable<Entry<Key, Value>> {
 
         Entry<Key, Value> entry = iterator.next();
 
-        long colType = entry.getKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
-
-        if (colType == ColumnConstants.LOCK_PREFIX) {
-          resolveLock(entry);
-          continue mloop;
-        } else if (colType == ColumnConstants.DATA_PREFIX) {
-          stats.incrementEntriesReturned(1);
-          return entry;
-        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
-          return entry;
-        } else {
-          throw new IllegalArgumentException("Unexpected column type " + 
colType);
+        ColumnType colType = ColumnType.from(entry.getKey());
+
+        switch (colType) {
+          case LOCK:
+            resolveLock(entry);
+            continue mloop;
+          case DATA:
+            stats.incrementEntriesReturned(1);
+            return entry;
+          case RLOCK:
+            return entry;
+          default:
+            throw new IllegalArgumentException("Unexpected column type " + 
colType);
         }
       }
     }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index ae87a24..3cbbe5c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -87,8 +88,6 @@ import org.apache.fluo.core.util.UtilWaitThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
-import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
 import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
 
@@ -278,7 +277,7 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
         continue;
       }
 
-      if ((kve.getKey().getTimestamp() & PREFIX_MASK) == RLOCK_PREFIX) {
+      if (ColumnType.from(kve.getKey()) == ColumnType.RLOCK) {
         if (readLockCols == null) {
           readLockCols = readLocksSeen.computeIfAbsent(row, k -> new 
HashSet<>());
         }
@@ -407,14 +406,14 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     }
 
     if (isWrite(val) && !isDelete(val)) {
-      cm.put(col, ColumnConstants.DATA_PREFIX | startTs, val.toArray());
+      cm.put(col, ColumnType.DATA.enode(startTs), val.toArray());
     }
 
     if (isReadLock(val)) {
-      cm.put(col, ColumnConstants.RLOCK_PREFIX | 
ReadLockUtil.encodeTs(startTs, false),
+      cm.put(col, ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, 
false)),
           ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID()));
     } else {
-      cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, 
LockValue.encode(primaryRow, primaryColumn,
+      cm.put(col, ColumnType.LOCK.enode(startTs), LockValue.encode(primaryRow, 
primaryColumn,
           isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
     }
 
@@ -668,11 +667,10 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
         if (notification.getColumn().equals(col)) {
           // check to see if ACK exist after notification
           Key startKey = SpanUtil.toKey(notification.getRowColumn());
-          startKey.setTimestamp(
-              ColumnConstants.ACK_PREFIX | (Long.MAX_VALUE & 
ColumnConstants.TIMESTAMP_MASK));
+          startKey.setTimestamp(ColumnType.ACK.first());
 
           Key endKey = SpanUtil.toKey(notification.getRowColumn());
-          endKey.setTimestamp(ColumnConstants.ACK_PREFIX | 
(notification.getTimestamp() + 1));
+          endKey.setTimestamp(ColumnType.ACK.enode(notification.getTimestamp() 
+ 1));
 
           Range range = new Range(startKey, endKey);
 
@@ -1112,11 +1110,10 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
         m = new Flutation(env, row);
         for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
           if (isReadLock(entry.getValue())) {
-            m.put(entry.getKey(),
-                ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, 
true),
+            m.put(entry.getKey(), 
ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)),
                 DelReadLockValue.encodeRollback());
           } else {
-            m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+            m.put(entry.getKey(), ColumnType.DEL_LOCK.enode(startTs),
                 DelLockValue.encodeRollback(false, true));
           }
         }
@@ -1134,9 +1131,9 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
       // mark transaction as complete for garbage collection purposes
       Flutation m = new Flutation(env, cd.prow);
 
-      m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+      m.put(cd.pcol, ColumnType.DEL_LOCK.enode(startTs),
           DelLockValue.encodeRollback(startTs, true, true));
-      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+      m.put(cd.pcol, ColumnType.TX_DONE.enode(startTs), EMPTY);
 
       return Collections.singletonList(m);
     }
@@ -1392,7 +1389,7 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
 
       Flutation m = new Flutation(env, cd.prow);
       // mark transaction as complete for garbage collection purposes
-      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
+      m.put(cd.pcol, ColumnType.TX_DONE.enode(commitTs), EMPTY);
       afterFlushMutations.add(m);
 
       if (weakNotification != null) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
index 849e2b5..2b6126d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -22,6 +22,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.RollbackCheckIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
@@ -51,43 +52,50 @@ public class TxInfo {
       return txInfo;
     }
 
-    long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+    ColumnType colType = ColumnType.from(entry.getKey());
     long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-    if (colType == ColumnConstants.LOCK_PREFIX) {
-      if (ts == startTs) {
-        txInfo.status = TxStatus.LOCKED;
-        txInfo.lockValue = entry.getValue().get();
-      } else {
-        txInfo.status = TxStatus.UNKNOWN; // locked by another tx
+    switch (colType) {
+      case LOCK: {
+        if (ts == startTs) {
+          txInfo.status = TxStatus.LOCKED;
+          txInfo.lockValue = entry.getValue().get();
+        } else {
+          txInfo.status = TxStatus.UNKNOWN; // locked by another tx
+        }
+        break;
       }
-    } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
-      DelLockValue dlv = new DelLockValue(entry.getValue().get());
-
-      if (ts != startTs) {
-        // expect this to always be false, must be a bug in the iterator
-        throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != " 
+ startTs + ") ");
+      case DEL_LOCK: {
+        DelLockValue dlv = new DelLockValue(entry.getValue().get());
+
+        if (ts != startTs) {
+          // expect this to always be false, must be a bug in the iterator
+          throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != 
" + startTs + ") ");
+        }
+
+        if (dlv.isRollback()) {
+          txInfo.status = TxStatus.ROLLED_BACK;
+        } else {
+          txInfo.status = TxStatus.COMMITTED;
+          txInfo.commitTs = dlv.getCommitTimestamp();
+        }
+        break;
       }
+      case WRITE: {
+        long timePtr = WriteValue.getTimestamp(entry.getValue().get());
 
-      if (dlv.isRollback()) {
-        txInfo.status = TxStatus.ROLLED_BACK;
-      } else {
-        txInfo.status = TxStatus.COMMITTED;
-        txInfo.commitTs = dlv.getCommitTimestamp();
-      }
-    } else if (colType == ColumnConstants.WRITE_PREFIX) {
-      long timePtr = WriteValue.getTimestamp(entry.getValue().get());
+        if (timePtr != startTs) {
+          // expect this to always be false, must be a bug in the iterator
+          throw new IllegalStateException(
+              prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
+        }
 
-      if (timePtr != startTs) {
-        // expect this to always be false, must be a bug in the iterator
-        throw new IllegalStateException(
-            prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
+        txInfo.status = TxStatus.COMMITTED;
+        txInfo.commitTs = ts;
+        break;
       }
-
-      txInfo.status = TxStatus.COMMITTED;
-      txInfo.commitTs = ts;
-    } else {
-      throw new IllegalStateException("unexpected col type returned " + 
colType);
+      default:
+        throw new IllegalStateException("unexpected col type returned " + 
colType);
     }
 
     return txInfo;
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java 
b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bbf1d83..9ea5130 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -56,19 +56,18 @@ public class ColumnUtil {
       boolean isWrite, boolean isDelete, boolean isReadlock, long startTs, 
long commitTs,
       Set<Column> observedColumns, Mutation m) {
     if (isReadlock) {
-      Flutation.put(env, m, col,
-          ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+      Flutation.put(env, m, col, 
ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)),
           DelReadLockValue.encodeCommit(commitTs));
     } else if (isWrite) {
-      Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs,
+      Flutation.put(env, m, col, ColumnType.WRITE.enode(commitTs),
           WriteValue.encode(startTs, isPrimary, isDelete));
     } else {
-      Flutation.put(env, m, col, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+      Flutation.put(env, m, col, ColumnType.DEL_LOCK.enode(startTs),
           DelLockValue.encodeCommit(commitTs, isPrimary));
     }
 
     if (isTrigger) {
-      Flutation.put(env, m, col, ColumnConstants.ACK_PREFIX | startTs, 
TransactionImpl.EMPTY);
+      Flutation.put(env, m, col, ColumnType.ACK.enode(startTs), 
TransactionImpl.EMPTY);
     }
   }
 
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
index 546cf35..3b475bf 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.LongUtil;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
@@ -638,12 +639,12 @@ public class FailureIT extends ITBaseImpl {
     Scanner scanner = aClient.createScanner(getCurTableName(), 
Authorizations.EMPTY);
 
     for (Entry<Key, Value> entry : scanner) {
-      long colType = entry.getKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(entry.getKey());
       long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
       String row = entry.getKey().getRowData().toString();
       byte[] val = entry.getValue().get();
 
-      if (row.equals(rolledBackRow) && colType == 
ColumnConstants.DEL_LOCK_PREFIX && ts == startTs
+      if (row.equals(rolledBackRow) && colType == ColumnType.DEL_LOCK && ts == 
startTs
           && DelLockValue.isPrimary(val)) {
         sawExpected = true;
       }
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index b99b16d..03ff986 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.TransactionBase;
@@ -308,10 +309,10 @@ public class GarbageCollectionIteratorIT extends 
ITBaseImpl {
         numWrites = 0;
       }
 
-      long colType = entry.getKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
+      ColumnType colType = ColumnType.from(entry.getKey());
       long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
-      if (colType == ColumnConstants.WRITE_PREFIX) {
+      if (colType == ColumnType.WRITE) {
         numWrites++;
         if (numWrites > 1) {
           Assert.assertTrue("Extra write had ts " + ts + " < " + oldestTs, ts 
>= oldestTs);
diff --git 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
index 1e959ff..3fa156c 100644
--- 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
+++ 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -19,7 +19,7 @@ import java.nio.charset.StandardCharsets;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
 import org.apache.accumulo.core.data.Key;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -45,14 +45,14 @@ import org.apache.hadoop.io.Text;
  *   FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
  *   // could also reuse column objects.
  *   Column column = new Column("fam1", "fam2");
- * 
+ *
  *   fkvg.setRow("row1").setColumn(column).setValue("val2");
- * 
+ *
  *   for (FluoKeyValue fluoKeyValue : fkvg.getKeyValues())
  *     writeToAccumuloFile(fluoKeyValue);
- * 
+ *
  *   fkvg.setRow("row2").setColumn(column).setValue("val3");
- * 
+ *
  *   // Each call to getKeyValues() returns the same objects populated with 
different data when
  *   // possible. So subsequent calls to getKeyValues() will create less 
objects. Of course this
  *   // invalidates what was returned by previous calls to getKeyValues().
@@ -187,11 +187,11 @@ public class FluoKeyValueGenerator {
    */
   public FluoKeyValue[] getKeyValues() {
     FluoKeyValue kv = keyVals[0];
-    kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.WRITE_PREFIX | 1));
+    kv.setKey(new Key(row, fam, qual, vis, ColumnType.WRITE.enode(1)));
     kv.getValue().set(WriteValue.encode(0, false, false));
 
     kv = keyVals[1];
-    kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.DATA_PREFIX | 0));
+    kv.setKey(new Key(row, fam, qual, vis, ColumnType.DATA.enode(0)));
     kv.getValue().set(val);
 
     return keyVals;
diff --git 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
index dd813ab..e09f20e 100644
--- 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
+++ 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -83,9 +83,8 @@ public class FluoMutationGenerator {
   }
 
   public FluoMutationGenerator put(Column col, byte[] value) {
-    Flutation.put(mutation, col, ColumnConstants.DATA_PREFIX | 0, value);
-    Flutation.put(mutation, col, ColumnConstants.WRITE_PREFIX | 1,
-        WriteValue.encode(0, false, false));
+    Flutation.put(mutation, col, ColumnType.DATA.enode(0), value);
+    Flutation.put(mutation, col, ColumnType.WRITE.enode(1), 
WriteValue.encode(0, false, false));
     return this;
   }
 

Reply via email to