This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 7539679 Added column type enum (#1060)
7539679 is described below
commit 75396795ef672174ecea7dfed07ad5a044e3363b
Author: Keith Turner <[email protected]>
AuthorDate: Thu Nov 8 10:37:21 2018 -0500
Added column type enum (#1060)
---
.../apache/fluo/accumulo/format/FluoFormatter.java | 46 ++---
.../iterators/GarbageCollectionIterator.java | 196 +++++++++++----------
.../accumulo/iterators/OpenReadLockIterator.java | 67 +++----
.../fluo/accumulo/iterators/PrewriteIterator.java | 179 ++++++++++---------
.../accumulo/iterators/RollbackCheckIterator.java | 102 ++++++-----
.../fluo/accumulo/iterators/SnapshotIterator.java | 140 ++++++++-------
.../iterators/TimestampSkippingIterator.java | 8 +-
.../apache/fluo/accumulo/util/ColumnConstants.java | 13 +-
.../org/apache/fluo/accumulo/util/ColumnType.java | 90 ++++++++++
.../accumulo/iterators/SnapshotIteratorTest.java | 8 +-
.../apache/fluo/accumulo/iterators/TestData.java | 22 +--
.../apache/fluo/accumulo/util/ColumnTypeTest.java | 96 ++++++++++
.../org/apache/fluo/core/impl/LockResolver.java | 14 +-
.../fluo/core/impl/ParallelSnapshotScanner.java | 30 ++--
.../org/apache/fluo/core/impl/SnapshotScanner.java | 35 ++--
.../org/apache/fluo/core/impl/TransactionImpl.java | 27 ++-
.../java/org/apache/fluo/core/impl/TxInfo.java | 72 ++++----
.../java/org/apache/fluo/core/util/ColumnUtil.java | 15 +-
.../apache/fluo/integration/impl/FailureIT.java | 9 +-
.../impl/GarbageCollectionIteratorIT.java | 9 +-
.../fluo/mapreduce/FluoKeyValueGenerator.java | 18 +-
.../fluo/mapreduce/FluoMutationGenerator.java | 11 +-
22 files changed, 713 insertions(+), 494 deletions(-)
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
index d87975a..e202816 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -21,6 +21,7 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
@@ -88,34 +89,21 @@ public class FluoFormatter {
} else {
long ts = key.getTimestamp();
String type = "";
-
- if ((ts & ColumnConstants.PREFIX_MASK) ==
ColumnConstants.TX_DONE_PREFIX) {
- type = "TX_DONE";
- }
- if ((ts & ColumnConstants.PREFIX_MASK) ==
ColumnConstants.DEL_LOCK_PREFIX) {
- type = "DEL_LOCK";
- }
- if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.LOCK_PREFIX) {
- type = "LOCK";
- }
- if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DATA_PREFIX) {
- type = "DATA";
+ ColumnType colType = ColumnType.from(ts);
+
+ switch (colType) {
+ case RLOCK:
+ if (ReadLockUtil.isDelete(ts)) {
+ type = "DEL_RLOCK";
+ } else {
+ type = "RLOCK";
+ }
+ ts = ReadLockUtil.decodeTs(ts);
+ break;
+ default:
+ type = colType.toString();
+ break;
}
- if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.WRITE_PREFIX) {
- type = "WRITE";
- }
- if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) {
- type = "ACK";
- }
- if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
- if (ReadLockUtil.isDelete(ts)) {
- type = "DEL_RLOCK";
- } else {
- type = "RLOCK";
- }
- ts = ReadLockUtil.decodeTs(ts);
- }
-
StringBuilder sb = new StringBuilder();
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index d07f59e..ad4a8aa 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
@@ -125,10 +126,10 @@ public class GarbageCollectionIterator implements
SortedKeyValueIterator<Key, Va
private boolean consumeData() throws IOException {
while (source.hasTop()
&& curCol.equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
- long colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.DATA_PREFIX) {
+ if (colType == ColumnType.DATA) {
if (ts >= truncationTime && !rolledback.contains(ts)) {
return false;
}
@@ -173,132 +174,147 @@ public class GarbageCollectionIterator implements
SortedKeyValueIterator<Key, Va
return;
}
- while (source.hasTop()
+ loop: while (source.hasTop()
&& curCol.equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
- long colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.TX_DONE_PREFIX) {
- keys.add(source.getTopKey(), source.getTopValue());
- completeTxs.add(ts);
- } else if (colType == ColumnConstants.WRITE_PREFIX) {
- boolean keep = false;
- boolean complete = completeTxs.contains(ts);
- byte[] val = source.getTopValue().get();
- long timePtr = WriteValue.getTimestamp(val);
-
- if (WriteValue.isPrimary(val) && !complete) {
- keep = true;
+ switch (colType) {
+ case TX_DONE: {
+ keys.add(source.getTopKey(), source.getTopValue());
+ completeTxs.add(ts);
+ break;
}
+ case WRITE: {
+ boolean keep = false;
+ boolean complete = completeTxs.contains(ts);
+ byte[] val = source.getTopValue().get();
+ long timePtr = WriteValue.getTimestamp(val);
- if (!oldestSeen) {
- if (firstWrite == -1) {
- firstWrite = ts;
+ if (WriteValue.isPrimary(val) && !complete) {
+ keep = true;
}
- if (ts < gcTimestamp) {
- oldestSeen = true;
- truncationTime = timePtr;
- if (!(WriteValue.isDelete(val) && isFullMajc)) {
+ if (!oldestSeen) {
+ if (firstWrite == -1) {
+ firstWrite = ts;
+ }
+
+ if (ts < gcTimestamp) {
+ oldestSeen = true;
+ truncationTime = timePtr;
+ if (!(WriteValue.isDelete(val) && isFullMajc)) {
+ keep = true;
+ }
+ } else {
keep = true;
}
- } else {
- keep = true;
}
- }
- if (timePtr > invalidationTime) {
- invalidationTime = timePtr;
- }
+ if (timePtr > invalidationTime) {
+ invalidationTime = timePtr;
+ }
- if (keep) {
- keys.add(source.getTopKey(), val);
- } else if (complete) {
- completeTxs.remove(ts);
+ if (keep) {
+ keys.add(source.getTopKey(), val);
+ } else if (complete) {
+ completeTxs.remove(ts);
+ }
+ break;
}
- } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
- boolean keep = false;
- long txDoneTs =
DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
- boolean complete = completeTxs.contains(txDoneTs);
+ case DEL_LOCK: {
+ boolean keep = false;
+ long txDoneTs =
DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
+ boolean complete = completeTxs.contains(txDoneTs);
- byte[] val = source.getTopValue().get();
+ byte[] val = source.getTopValue().get();
- if (!complete && DelLockValue.isPrimary(val)) {
- keep = true;
- }
+ if (!complete && DelLockValue.isPrimary(val)) {
+ keep = true;
+ }
- if (DelLockValue.isRollback(val)) {
- rolledback.add(ts);
- keep |= !isFullMajc;
- }
+ if (DelLockValue.isRollback(val)) {
+ rolledback.add(ts);
+ keep |= !isFullMajc;
+ }
- if (ts > invalidationTime) {
- invalidationTime = ts;
- }
+ if (ts > invalidationTime) {
+ invalidationTime = ts;
+ }
- if (keep) {
- keys.add(source.getTopKey(), source.getTopValue());
- } else if (complete) {
- completeTxs.remove(txDoneTs);
+ if (keep) {
+ keys.add(source.getTopKey(), source.getTopValue());
+ } else if (complete) {
+ completeTxs.remove(txDoneTs);
+ }
+ break;
}
- } else if (colType == ColumnConstants.RLOCK_PREFIX) {
- boolean keep = false;
- long rlts = ReadLockUtil.decodeTs(ts);
- boolean isDelete = ReadLockUtil.isDelete(ts);
+ case RLOCK: {
+ boolean keep = false;
+ long rlts = ReadLockUtil.decodeTs(ts);
+ boolean isDelete = ReadLockUtil.isDelete(ts);
- if (isDelete) {
- lastReadLockDeleteTs = rlts;
- }
+ if (isDelete) {
+ lastReadLockDeleteTs = rlts;
+ }
- if (rlts > invalidationTime) {
- if (isFullMajc) {
- if (isDelete) {
- if (DelReadLockValue.isRollback(source.getTopValue().get())) {
- // can drop rolled back read lock delete markers on any full
majc, do not need to
- // consider gcTimestamp
- keep = false;
+ if (rlts > invalidationTime) {
+ if (isFullMajc) {
+ if (isDelete) {
+ if (DelReadLockValue.isRollback(source.getTopValue().get())) {
+ // can drop rolled back read lock delete markers on any full
majc, do not need to
+ // consider gcTimestamp
+ keep = false;
+ } else {
+ long rlockCommitTs =
+
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+ keep = rlockCommitTs >= gcTimestamp;
+ }
} else {
- long rlockCommitTs =
-
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
- keep = rlockCommitTs >= gcTimestamp;
+ keep = lastReadLockDeleteTs != rlts;
}
} else {
- keep = lastReadLockDeleteTs != rlts;
+ // can drop deleted read lock entries.. keep the delete entry.
+ keep = isDelete || lastReadLockDeleteTs != rlts;
}
- } else {
- // can drop deleted read lock entries.. keep the delete entry.
- keep = isDelete || lastReadLockDeleteTs != rlts;
}
- }
- if (keep) {
- keys.add(source.getTopKey(), source.getTopValue());
- }
- } else if (colType == ColumnConstants.LOCK_PREFIX) {
- if (ts > invalidationTime) {
- keys.add(source.getTopKey(), source.getTopValue());
+ if (keep) {
+ keys.add(source.getTopKey(), source.getTopValue());
+ }
+ break;
}
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- // can stop looking
- break;
- } else if (colType == ColumnConstants.ACK_PREFIX) {
- if (!sawAck) {
- if (ts >= firstWrite) {
+ case LOCK: {
+ if (ts > invalidationTime) {
keys.add(source.getTopKey(), source.getTopValue());
}
- sawAck = true;
+ break;
}
- } else {
- throw new IllegalArgumentException(" unknown colType " +
String.format("%x", colType));
+ case DATA: {
+ // can stop looking
+ break loop;
+ }
+ case ACK: {
+ if (!sawAck) {
+ if (ts >= firstWrite) {
+ keys.add(source.getTopKey(), source.getTopValue());
+ }
+ sawAck = true;
+ }
+ break;
+ }
+
+ default:
+ throw new IllegalArgumentException(" unknown colType " + colType);
+
}
source.next();
}
keys.copyTo(keysFiltered, (timestamp -> {
- long colType = timestamp & ColumnConstants.PREFIX_MASK;
- if (colType == ColumnConstants.TX_DONE_PREFIX) {
+ if (ColumnType.from(timestamp) == ColumnType.TX_DONE) {
return completeTxs.contains(timestamp &
ColumnConstants.TIMESTAMP_MASK);
} else {
return true;
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
index dd4de54..167eff7 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -26,17 +26,10 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
-import static org.apache.fluo.accumulo.util.ColumnConstants.ACK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.DATA_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.DEL_LOCK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.LOCK_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
import static org.apache.fluo.accumulo.util.ColumnConstants.TIMESTAMP_MASK;
-import static org.apache.fluo.accumulo.util.ColumnConstants.TX_DONE_PREFIX;
-import static org.apache.fluo.accumulo.util.ColumnConstants.WRITE_PREFIX;
public class OpenReadLockIterator implements SortedKeyValueIterator<Key,
Value> {
@@ -47,35 +40,43 @@ public class OpenReadLockIterator implements
SortedKeyValueIterator<Key, Value>
private void findTop() throws IOException {
while (source.hasTop()) {
- long colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(source.getTopKey());
- if (colType == TX_DONE_PREFIX || colType == WRITE_PREFIX || colType ==
DEL_LOCK_PREFIX) {
- source.skipToPrefix(source.getTopKey(), RLOCK_PREFIX);
- continue;
- } else if (colType == RLOCK_PREFIX) {
- if (ReadLockUtil.isDelete(source.getTopKey())) {
- lastDelete.set(source.getTopKey());
- } else {
- if (lastDelete.equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
- long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp()
& TIMESTAMP_MASK);
- long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() &
TIMESTAMP_MASK);
-
- if (ts1 != ts2) {
+ switch (colType) {
+ case TX_DONE:
+ case WRITE:
+ case DEL_LOCK: {
+ source.skipToPrefix(source.getTopKey(), ColumnType.RLOCK);
+ break;
+ }
+ case RLOCK: {
+ if (ReadLockUtil.isDelete(source.getTopKey())) {
+ lastDelete.set(source.getTopKey());
+ } else {
+ if (lastDelete.equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+ long ts1 =
ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK);
+ long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() &
TIMESTAMP_MASK);
+
+ if (ts1 != ts2) {
+ // found a read lock that is not suppressed by a delete read
lock entry
+ return;
+ }
+ } else {
// found a read lock that is not suppressed by a delete read
lock entry
return;
}
- } else {
- // found a read lock that is not suppressed by a delete read lock
entry
- return;
}
+ source.next();
+ break;
+ }
+ case DATA:
+ case LOCK:
+ case ACK: {
+ source.skipColumn(source.getTopKey());
+ break;
}
- source.next();
- continue;
- } else if (colType == DATA_PREFIX || colType == LOCK_PREFIX || colType
== ACK_PREFIX) {
- source.skipColumn(source.getTopKey());
- continue;
- } else {
- throw new IllegalArgumentException("Unknown column type " +
source.getTopKey());
+ default:
+ throw new IllegalArgumentException("Unknown column type " +
source.getTopKey());
}
}
}
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
index b6f9a48..f6de3f8 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.WriteValue;
@@ -102,9 +103,9 @@ public class PrewriteIterator implements
SortedKeyValueIterator<Key, Value> {
Key endKey = new Key(range.getStartKey());
if (checkAck) {
- endKey.setTimestamp(ColumnConstants.DATA_PREFIX |
ColumnConstants.TIMESTAMP_MASK);
+ endKey.setTimestamp(ColumnType.DATA.first());
} else {
- endKey.setTimestamp(ColumnConstants.ACK_PREFIX |
ColumnConstants.TIMESTAMP_MASK);
+ endKey.setTimestamp(ColumnType.ACK.first());
}
// Tried seeking directly to WRITE_PREFIX, however this did not work well
because of how
@@ -120,114 +121,126 @@ public class PrewriteIterator implements
SortedKeyValueIterator<Key, Value> {
while (source.hasTop() &&
seekRange.getStartKey().equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
- long colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.TX_DONE_PREFIX) {
- // tried to make 1st seek go to WRITE_PREFIX, but this did not allow
the DeleteIterator to
- // be removed from the stack so it was slower.
- source.skipToPrefix(seekRange.getStartKey(),
ColumnConstants.WRITE_PREFIX);
- } else if (colType == ColumnConstants.WRITE_PREFIX) {
- long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
- if (timePtr > invalidationTime) {
- invalidationTime = timePtr;
+ switch (colType) {
+ case TX_DONE: {
+ // tried to make 1st seek go to WRITE_PREFIX, but this did not allow
the DeleteIterator to
+ // be removed from the stack so it was slower.
+ source.skipToPrefix(seekRange.getStartKey(), ColumnType.WRITE);
+ break;
}
+ case WRITE: {
+ long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
- if (ts >= snaptime) {
- hasTop = true;
- return;
- }
-
- source.skipToPrefix(seekRange.getStartKey(),
ColumnConstants.DEL_LOCK_PREFIX);
- } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
- if (ts > invalidationTime) {
- invalidationTime = ts;
+ if (timePtr > invalidationTime) {
+ invalidationTime = timePtr;
+ }
if (ts >= snaptime) {
hasTop = true;
return;
}
- }
- if (readlock) {
- source.skipToPrefix(seekRange.getStartKey(),
ColumnConstants.LOCK_PREFIX);
- } else {
- source.skipToPrefix(seekRange.getStartKey(),
ColumnConstants.RLOCK_PREFIX);
+ source.skipToPrefix(seekRange.getStartKey(), ColumnType.DEL_LOCK);
+ break;
}
- } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+ case DEL_LOCK: {
+ if (ts > invalidationTime) {
+ invalidationTime = ts;
- long lastDeleteTs = -1;
- long rlts = ReadLockUtil.decodeTs(ts);
+ if (ts >= snaptime) {
+ hasTop = true;
+ return;
+ }
+ }
- if (!readlock) {
- while (rlts > invalidationTime && colType ==
ColumnConstants.RLOCK_PREFIX) {
- if (ReadLockUtil.isDelete(ts)) {
- // ignore rolled back read locks, these should never prevent a
write lock
- if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
- if (rlts >= snaptime) {
- hasTop = true;
- return;
- } else {
- long rlockCommitTs =
-
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
- if (rlockCommitTs > snaptime) {
+ if (readlock) {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK);
+ } else {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnType.RLOCK);
+ }
+ break;
+ }
+ case RLOCK: {
+ long lastDeleteTs = -1;
+ long rlts = ReadLockUtil.decodeTs(ts);
+
+ if (!readlock) {
+ while (rlts > invalidationTime && colType == ColumnType.RLOCK) {
+ if (ReadLockUtil.isDelete(ts)) {
+ // ignore rolled back read locks, these should never prevent a
write lock
+ if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
+ if (rlts >= snaptime) {
hasTop = true;
return;
+ } else {
+ long rlockCommitTs =
+
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+ if (rlockCommitTs > snaptime) {
+ hasTop = true;
+ return;
+ }
}
}
- }
- lastDeleteTs = rlts;
- } else {
- if (rlts != lastDeleteTs) {
- // this read lock is active
- hasTop = true;
- return;
+ lastDeleteTs = rlts;
+ } else {
+ if (rlts != lastDeleteTs) {
+ // this read lock is active
+ hasTop = true;
+ return;
+ }
+ }
+
+ source.next();
+ if (source.hasTop()) {
+ colType = ColumnType.from(source.getTopKey());
+ ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
+ rlts = ReadLockUtil.decodeTs(ts);
+ } else {
+ break;
}
}
+ }
- source.next();
- if (source.hasTop()) {
- colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
- ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
- rlts = ReadLockUtil.decodeTs(ts);
- } else {
- break;
- }
+ if (source.hasTop() && (colType == ColumnType.RLOCK)) {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK);
}
+ break;
}
+ case LOCK: {
+ if (ts > invalidationTime) {
+ // nothing supersedes this lock, therefore the column is locked
+ hasTop = true;
+ return;
+ }
- if (source.hasTop() && (colType == ColumnConstants.RLOCK_PREFIX)) {
- source.skipToPrefix(seekRange.getStartKey(),
ColumnConstants.LOCK_PREFIX);
- }
- } else if (colType == ColumnConstants.LOCK_PREFIX) {
- if (ts > invalidationTime) {
- // nothing supersedes this lock, therefore the column is locked
- hasTop = true;
- return;
+ if (checkAck) {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnType.ACK);
+ } else {
+ // only ack and data left and not interested in either so stop
looking
+ return;
+ }
+ break;
}
-
- if (checkAck) {
- source.skipToPrefix(seekRange.getStartKey(),
ColumnConstants.ACK_PREFIX);
- } else {
- // only ack and data left and not interested in either so stop
looking
+ case DATA: {
+ // can stop looking
return;
}
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- // can stop looking
- return;
- } else if (colType == ColumnConstants.ACK_PREFIX) {
- if (checkAck && ts > ntfyTimestamp) {
- hasTop = true;
- return;
- } else {
- // nothing else to look at in this column
- return;
+ case ACK: {
+ if (checkAck && ts > ntfyTimestamp) {
+ hasTop = true;
+ return;
+ } else {
+ // nothing else to look at in this column
+ return;
+ }
}
- } else {
- throw new IllegalArgumentException();
+ default:
+ throw new IllegalArgumentException();
}
}
}
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
index cae383f..7df373f 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
public class RollbackCheckIterator implements SortedKeyValueIterator<Key,
Value> {
@@ -91,60 +92,69 @@ public class RollbackCheckIterator implements
SortedKeyValueIterator<Key, Value>
hasTop = false;
while (source.hasTop()
&& curCol.equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
- long colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.TX_DONE_PREFIX) {
- source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
- continue;
- } else if (colType == ColumnConstants.WRITE_PREFIX) {
- long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
- if (timePtr > invalidationTime) {
- invalidationTime = timePtr;
+ switch (colType) {
+ case TX_DONE:
+ source.skipToPrefix(curCol, ColumnType.WRITE);
+ continue;
+ case WRITE: {
+ long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
+
+ if (timePtr > invalidationTime) {
+ invalidationTime = timePtr;
+ }
+
+ if (lockTime == timePtr) {
+ hasTop = true;
+ return;
+ }
+
+ if (lockTime > timePtr) {
+ source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
+ continue;
+ }
+ break;
}
-
- if (lockTime == timePtr) {
- hasTop = true;
- return;
+ case DEL_LOCK: {
+ if (ts > invalidationTime) {
+ invalidationTime = ts;
+ }
+
+ if (ts == lockTime) {
+ hasTop = true;
+ return;
+ }
+
+ if (lockTime > ts) {
+ source.skipToPrefix(curCol, ColumnType.LOCK);
+ continue;
+ }
+ break;
}
-
- if (lockTime > timePtr) {
- source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
+ case RLOCK: {
+ source.skipToPrefix(curCol, ColumnType.LOCK);
continue;
}
-
- } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
- if (ts > invalidationTime) {
- invalidationTime = ts;
+ case LOCK: {
+ if (ts > invalidationTime) {
+ // nothing supersedes this lock, therefore the column is locked
+ hasTop = true;
+ return;
+ }
+ break;
}
-
- if (ts == lockTime) {
- hasTop = true;
+ case DATA: {
+ // can stop looking
return;
}
-
- if (lockTime > ts) {
- source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
- continue;
- }
-
- } else if (colType == ColumnConstants.RLOCK_PREFIX) {
- source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
- continue;
- } else if (colType == ColumnConstants.LOCK_PREFIX) {
- if (ts > invalidationTime) {
- // nothing supersedes this lock, therefore the column is locked
- hasTop = true;
- return;
+ case ACK: {
+ // do nothing if ACK
+ break;
}
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- // can stop looking
- return;
- } else if (colType == ColumnConstants.ACK_PREFIX) {
- // do nothing if ACK
- } else {
- throw new IllegalArgumentException();
+ default:
+ throw new IllegalArgumentException();
}
source.next();
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
index 27e63ef..4b5ab9d 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
public class SnapshotIterator implements SortedKeyValueIterator<Key, Value> {
@@ -91,87 +92,95 @@ public class SnapshotIterator implements
SortedKeyValueIterator<Key, Value> {
while (source.hasTop()
&& curCol.equals(source.getTopKey(),
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
- long colType = source.getTopKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() &
ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.TX_DONE_PREFIX) {
- source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
- continue;
- } else if (colType == ColumnConstants.WRITE_PREFIX) {
- long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
-
- if (timePtr > invalidationTime) {
- invalidationTime = timePtr;
+ switch (colType) {
+ case TX_DONE: {
+ source.skipToPrefix(curCol, ColumnType.WRITE);
+ continue;
}
+ case WRITE: {
+ long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
- if (dataPointer == -1) {
- if (ts <= snaptime) {
- dataPointer = timePtr;
- source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
- continue;
+ if (timePtr > invalidationTime) {
+ invalidationTime = timePtr;
+ }
+
+ if (dataPointer == -1) {
+ if (ts <= snaptime) {
+ dataPointer = timePtr;
+ source.skipToPrefix(curCol, ColumnType.DEL_LOCK);
+ continue;
+ } else {
+ source.skipToTimestamp(curCol,
ColumnType.WRITE.enode(snaptime));
+ continue;
+ }
+ }
+ break;
+ }
+ case DEL_LOCK: {
+ if (ts > invalidationTime) {
+ invalidationTime = ts;
+ }
+ if (returnReadLockPresent) {
+ source.skipToPrefix(curCol, ColumnType.RLOCK);
} else {
- source.skipToTimestamp(curCol, ColumnConstants.WRITE_PREFIX |
snaptime);
- continue;
+ source.skipToPrefix(curCol, ColumnType.LOCK);
}
+ continue;
}
- } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
- if (ts > invalidationTime) {
- invalidationTime = ts;
+ case RLOCK: {
+ if (returnReadLockPresent) {
+ rememberReadLock(source.getTopKey(), source.getTopValue());
+ }
+
+ source.skipToPrefix(curCol, ColumnType.LOCK);
+ continue;
}
- if (returnReadLockPresent) {
- source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX);
- } else {
- source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+ case LOCK: {
+ if (ts > invalidationTime && ts <= snaptime) {
+ // nothing supersedes this lock, therefore the column is locked
+ return;
+ } else {
+ if (dataPointer == -1) {
+ source.skipColumn(curCol);
+ continue outer;
+ } else {
+ source.skipToTimestamp(curCol,
ColumnType.DATA.enode(dataPointer));
+ continue;
+ }
+ }
}
- continue;
+ case DATA: {
+ if (dataPointer == ts) {
+ // found data for this column
+ return;
+ }
- } else if (colType == ColumnConstants.RLOCK_PREFIX) {
- if (returnReadLockPresent) {
- rememberReadLock(source.getTopKey(), source.getTopValue());
- }
+ if (ts < dataPointer || dataPointer == -1) {
+ source.skipColumn(curCol);
+ continue outer;
+ }
- source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
- continue;
- } else if (colType == ColumnConstants.LOCK_PREFIX) {
- if (ts > invalidationTime && ts <= snaptime) {
- // nothing supersedes this lock, therefore the column is locked
- return;
- } else {
+ if (ts > dataPointer) {
+ source.skipToTimestamp(curCol,
ColumnType.DATA.enode(dataPointer));
+ continue;
+ }
+ break;
+ }
+ case ACK: {
if (dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
} else {
- source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX |
dataPointer);
+ source.skipToTimestamp(curCol,
ColumnType.DATA.enode(dataPointer));
continue;
}
}
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- if (dataPointer == ts) {
- // found data for this column
- return;
- }
-
- if (ts < dataPointer || dataPointer == -1) {
- source.skipColumn(curCol);
- continue outer;
- }
-
- if (ts > dataPointer) {
- source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX |
dataPointer);
- continue;
- }
- } else if (colType == ColumnConstants.ACK_PREFIX) {
- if (dataPointer == -1) {
- source.skipColumn(curCol);
- continue outer;
- } else {
- source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX |
dataPointer);
- continue;
- }
- } else {
- throw new IllegalArgumentException();
+ default:
+ throw new IllegalArgumentException();
}
-
// TODO handle case where dataPointer >=0, but no data was found
source.next();
}
@@ -220,8 +229,7 @@ public class SnapshotIterator implements
SortedKeyValueIterator<Key, Value> {
if (range.getStartKey() != null && range.getStartKey().getTimestamp() !=
Long.MAX_VALUE
&& !range.isStartKeyInclusive()) {
- if ((range.getStartKey().getTimestamp()
- & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+ if (ColumnType.from(range.getStartKey()) == ColumnType.RLOCK) {
Key currCol = new Key(range.getStartKey());
currCol.setTimestamp(Long.MAX_VALUE);
newRange = new Range(currCol, true, range.getEndKey(),
range.isEndKeyInclusive());
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
index 3279ec2..534c82d 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
/**
* The purpose of this iterator is to make seeking within a columns timestamp
range efficient.
@@ -80,10 +80,8 @@ public class TimestampSkippingIterator implements
SortedKeyValueIterator<Key, Va
}
}
- public void skipToPrefix(Key curCol, long prefix) throws IOException {
- // first possible timestamp in sorted order for this prefix
- long timestamp = prefix | ColumnConstants.TIMESTAMP_MASK;
- skipToTimestamp(curCol, timestamp);
+ public void skipToPrefix(Key curCol, ColumnType colType) throws IOException {
+ skipToTimestamp(curCol, colType.first());
}
public void skipColumn(Key curCol) throws IOException {
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index 7063adc..16b9f1e 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -21,20 +21,11 @@ import org.apache.fluo.api.data.Bytes;
* Constants used extract data from columns
*/
public class ColumnConstants {
-
- public static final long PREFIX_MASK = 0xe000000000000000L;
- public static final long TX_DONE_PREFIX = 0x6000000000000000L;
- public static final long WRITE_PREFIX = 0x4000000000000000L;
- public static final long DEL_LOCK_PREFIX = 0x2000000000000000L;
- public static final long RLOCK_PREFIX = 0x0000000000000000L;
- public static final long LOCK_PREFIX = 0xe000000000000000L;
- public static final long ACK_PREFIX = 0xc000000000000000L;
- public static final long DATA_PREFIX = 0xa000000000000000L;
- public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
+ public static final long PREFIX_MASK = -1L << (64 - ColumnType.BITS);
+ public static final long TIMESTAMP_MASK = -1L >>> ColumnType.BITS;
public static final Bytes NOTIFY_CF = Bytes.of("ntfy");
public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify";
public static final Bytes GC_CF = Bytes.of("gc");
private ColumnConstants() {}
-
}
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
new file mode 100644
index 0000000..b16fe6d
--- /dev/null
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.data.Key;
+
+/**
+ * Abstracts how the Fluo column type is encoded in Accumulo timestamps.
+ */
+public enum ColumnType {
+
+ TX_DONE, WRITE, DEL_LOCK, RLOCK, LOCK, ACK, DATA;
+
+ private long prefix;
+
+ /**
+ * @return The first possible timestamp in sorted order.
+ */
+ public long first() {
+ return prefix | ColumnConstants.TIMESTAMP_MASK;
+ }
+
+ /**
+ * @return The timestamp with this column type encoded into the high order
bits.
+ */
+ public long enode(long timestamp) {
+ Preconditions.checkArgument((timestamp >>> (64 - BITS)) == 0);
+ return prefix | timestamp;
+ }
+
+ // The number of leftmost bits in in the timestamp reserved for encoding the
column type
+ static final int BITS = 3;
+ private static final byte TX_DONE_PREFIX = 0x03;
+ private static final byte WRITE_PREFIX = 0x02;
+ private static final byte DEL_LOCK_PREFIX = 0x01;
+ private static final byte RLOCK_PREFIX = 0x00;
+ private static final byte LOCK_PREFIX = 0x07;
+ private static final byte ACK_PREFIX = 0x06;
+ private static final byte DATA_PREFIX = 0x05;
+
+ static {
+ TX_DONE.prefix = (long) TX_DONE_PREFIX << (64 - BITS);
+ WRITE.prefix = (long) WRITE_PREFIX << (64 - BITS);
+ DEL_LOCK.prefix = (long) DEL_LOCK_PREFIX << (64 - BITS);
+ RLOCK.prefix = (long) RLOCK_PREFIX << (64 - BITS);
+ LOCK.prefix = (long) LOCK_PREFIX << (64 - BITS);
+ ACK.prefix = (long) ACK_PREFIX << (64 - BITS);
+ DATA.prefix = (long) DATA_PREFIX << (64 - BITS);
+ }
+
+ public static ColumnType from(Key k) {
+ return from(k.getTimestamp());
+ }
+
+ public static ColumnType from(long timestamp) {
+ byte prefix = (byte) (timestamp >>> (64 - BITS));
+ switch (prefix) {
+ case TX_DONE_PREFIX:
+ return TX_DONE;
+ case WRITE_PREFIX:
+ return WRITE;
+ case DEL_LOCK_PREFIX:
+ return DEL_LOCK;
+ case RLOCK_PREFIX:
+ return RLOCK;
+ case LOCK_PREFIX:
+ return LOCK;
+ case ACK_PREFIX:
+ return ACK;
+ case DATA_PREFIX:
+ return DATA;
+ default:
+ throw new IllegalArgumentException("Unknown prefix : " +
Integer.toHexString(prefix));
+ }
+ }
+}
diff --git
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
index 417fd81..7358e1e 100644
---
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
+++
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.junit.Assert;
import org.junit.Test;
@@ -320,6 +320,6 @@ public class SnapshotIteratorTest {
@Test(expected = IllegalArgumentException.class)
public void testNonZeroPrefix() {
- SnapshotIterator.setSnaptime(null, ColumnConstants.DATA_PREFIX | 6);
+ SnapshotIterator.setSnaptime(null, ColumnType.DATA.enode(6));
}
}
diff --git
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index dbdcd1b..ad595e0 100644
---
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.accumulo.iterators.CountingIterator.Counter;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -101,28 +101,28 @@ public class TestData {
switch (ct) {
case "ACK":
- ts |= ColumnConstants.ACK_PREFIX;
+ ts = ColumnType.ACK.enode(ts);
break;
case "TX_DONE":
- ts |= ColumnConstants.TX_DONE_PREFIX;
+ ts = ColumnType.TX_DONE.enode(ts);
break;
case "WRITE":
- ts |= ColumnConstants.WRITE_PREFIX;
+ ts = ColumnType.WRITE.enode(ts);
long writeTs = Long.parseLong(value.split("\\s+")[0]);
val = WriteValue.encode(writeTs, value.contains("PRIMARY"),
value.contains("DELETE"));
break;
case "LOCK":
- ts |= ColumnConstants.LOCK_PREFIX;
+ ts = ColumnType.LOCK.enode(ts);
String rc[] = value.split("\\s+");
val = LockValue.encode(Bytes.of(rc[0]), new Column(rc[1], rc[2]),
value.contains("WRITE"),
value.contains("DELETE"), value.contains("TRIGGER"), 42l);
break;
case "DATA":
- ts |= ColumnConstants.DATA_PREFIX;
+ ts = ColumnType.DATA.enode(ts);
val = value.getBytes();
break;
case "DEL_LOCK":
- ts |= ColumnConstants.DEL_LOCK_PREFIX;
+ ts = ColumnType.DEL_LOCK.enode(ts);
if (value.contains("ROLLBACK") || value.contains("ABORT")) {
val = DelLockValue.encodeRollback(value.contains("PRIMARY"), true);
} else {
@@ -132,11 +132,11 @@ public class TestData {
break;
case "RLOCK":
ts = ReadLockUtil.encodeTs(ts, false);
- ts |= ColumnConstants.RLOCK_PREFIX;
+ ts = ColumnType.RLOCK.enode(ts);
break;
case "DEL_RLOCK":
ts = ReadLockUtil.encodeTs(ts, true);
- ts |= ColumnConstants.RLOCK_PREFIX;
+ ts = ColumnType.RLOCK.enode(ts);
if (value.contains("ROLLBACK") || value.contains("ABORT")) {
val = DelReadLockValue.encodeRollback();
diff --git
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
new file mode 100644
index 0000000..3b0b09b
--- /dev/null
+++
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import org.apache.accumulo.core.data.Key;
+import org.junit.Test;
+
+import static org.apache.fluo.accumulo.util.ColumnType.ACK;
+import static org.apache.fluo.accumulo.util.ColumnType.DATA;
+import static org.apache.fluo.accumulo.util.ColumnType.DEL_LOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.LOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.RLOCK;
+import static org.apache.fluo.accumulo.util.ColumnType.TX_DONE;
+import static org.apache.fluo.accumulo.util.ColumnType.WRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnTypeTest {
+
+ private static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
+
+
+ // map of expected prefixes for a column type
+ private static final Map<Long, ColumnType> EPM;
+
+ static {
+ Builder<Long, ColumnType> builder = ImmutableMap.builder();
+ builder.put(0x6000000000000000L, TX_DONE);
+ builder.put(0x4000000000000000L, WRITE);
+ builder.put(0x2000000000000000L, DEL_LOCK);
+ builder.put(0x0000000000000000L, RLOCK);
+ builder.put(0xe000000000000000L, LOCK);
+ builder.put(0xc000000000000000L, ACK);
+ builder.put(0xa000000000000000L, DATA);
+ EPM = builder.build();
+ }
+
+ @Test
+ public void testPrefix() {
+ for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L <<
48}) {
+ EPM.forEach((prefix, colType) -> assertEquals(prefix | l,
colType.enode(l)));
+ }
+ }
+
+ @Test
+ public void testFirst() {
+ EPM.forEach((prefix, colType) -> assertEquals(prefix | TIMESTAMP_MASK,
colType.first()));
+ for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L <<
48}) {
+ EPM.forEach((prefix, colType) -> {
+ Key k1 = new Key("r", "f", "q");
+ k1.setTimestamp(prefix | l);
+ Key k2 = new Key("r", "f", "q");
+ k2.setTimestamp(colType.first());
+ assertTrue(k1.compareTo(k2) > 0);
+ });
+ }
+ }
+
+ @Test
+ public void testFrom() {
+ for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L <<
48}) {
+ EPM.forEach((prefix, colType) -> {
+ assertEquals(ColumnType.from(prefix | l), colType);
+ Key k = new Key("r", "f", "q");
+ k.setTimestamp(prefix | l);
+ assertEquals(ColumnType.from(k), colType);
+ });
+ }
+ }
+
+ @Test
+ public void testCoverage() {
+ EnumSet<ColumnType> expected = EnumSet.allOf(ColumnType.class);
+ HashSet<ColumnType> actual = new HashSet<>(EPM.values());
+ assertEquals(expected, actual);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index ea47bbf..54d2fa0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.OpenReadLockIterator;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -50,7 +51,6 @@ import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.SpanUtil;
-import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
/**
@@ -99,7 +99,7 @@ public class LockResolver {
public LockInfo(Entry<Key, Value> kve) {
long rawTs = kve.getKey().getTimestamp();
this.entry = kve;
- if ((rawTs & ColumnConstants.PREFIX_MASK) ==
ColumnConstants.RLOCK_PREFIX) {
+ if (ColumnType.from(rawTs) == ColumnType.RLOCK) {
this.lockTs = ReadLockUtil.decodeTs(rawTs);
ReadLockValue rlv = new ReadLockValue(kve.getValue().get());
this.prow = rlv.getPrimaryRow();
@@ -221,11 +221,11 @@ public class LockResolver {
if (lockInfo.isReadLock) {
mut.put(k.getColumnFamilyData().toArray(),
k.getColumnQualifierData().toArray(),
k.getColumnVisibilityParsed(),
- ColumnConstants.RLOCK_PREFIX |
ReadLockUtil.encodeTs(lockInfo.lockTs, true),
+ ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(lockInfo.lockTs,
true)),
DelReadLockValue.encodeRollback());
} else {
mut.put(k.getColumnFamilyData().toArray(),
k.getColumnQualifierData().toArray(),
- k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX |
lockInfo.lockTs,
+ k.getColumnVisibilityParsed(),
ColumnType.DEL_LOCK.enode(lockInfo.lockTs),
DelLockValue.encodeRollback(false, true));
}
}
@@ -241,7 +241,7 @@ public class LockResolver {
ConditionalFlutation delLockMutation = new ConditionalFlutation(env,
prc.prow,
new FluoCondition(env,
prc.pcol).setIterators(iterConf).setValue(lockValue));
- delLockMutation.put(prc.pcol, ColumnConstants.DEL_LOCK_PREFIX |
prc.startTs,
+ delLockMutation.put(prc.pcol, ColumnType.DEL_LOCK.enode(prc.startTs),
DelLockValue.encodeRollback(true, true));
ConditionalWriter cw = null;
@@ -312,7 +312,7 @@ public class LockResolver {
for (Column col : e1.getValue()) {
Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col));
Key end = new Key(start);
- end.setTimestamp(ColumnConstants.LOCK_PREFIX |
ColumnConstants.TIMESTAMP_MASK);
+ end.setTimestamp(ColumnType.LOCK.first());
ranges.add(new Range(start, true, end, false));
}
}
@@ -329,7 +329,7 @@ public class LockResolver {
List<Entry<Key, Value>> ret = new ArrayList<>();
for (Entry<Key, Value> entry : bscanner) {
- if ((entry.getKey().getTimestamp() & PREFIX_MASK) ==
ColumnConstants.RLOCK_PREFIX) {
+ if (ColumnType.from(entry.getKey()) == ColumnType.RLOCK) {
ret.add(entry);
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index f9f1e85..37dc45b 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -32,7 +32,7 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
@@ -176,16 +176,20 @@ public class ParallelSnapshotScanner {
Bytes row = rowConverter.apply(entry.getKey().getRowData());
Column col = columnConverter.apply(entry.getKey());
- long colType = entry.getKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
-
- if (colType == ColumnConstants.LOCK_PREFIX) {
- locks.add(entry);
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- ret.computeIfAbsent(row, k -> new HashMap<>()).put(col,
Bytes.of(entry.getValue().get()));
- } else if (colType == ColumnConstants.RLOCK_PREFIX) {
- readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
- } else {
- throw new IllegalArgumentException("Unexpected column type " +
colType);
+ ColumnType colType = ColumnType.from(entry.getKey());
+ switch (colType) {
+ case LOCK:
+ locks.add(entry);
+ break;
+ case DATA:
+ ret.computeIfAbsent(row, k -> new HashMap<>()).put(col,
+ Bytes.of(entry.getValue().get()));
+ break;
+ case RLOCK:
+ readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected column type " +
colType);
}
}
} finally {
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 5db7602..f8f83f7 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -31,7 +31,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.SnapshotIterator;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
@@ -172,9 +172,7 @@ public class SnapshotScanner implements Iterable<Entry<Key,
Value>> {
while (iterator.hasNext()) {
Entry<Key, Value> entry = iterator.next();
- long colType = entry.getKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
-
- if (colType == ColumnConstants.LOCK_PREFIX) {
+ if (ColumnType.from(entry.getKey()) == ColumnType.LOCK) {
locks.add(entry);
locksSeen.accept(lockEntry);
}
@@ -220,18 +218,19 @@ public class SnapshotScanner implements
Iterable<Entry<Key, Value>> {
Entry<Key, Value> entry = iterator.next();
- long colType = entry.getKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
-
- if (colType == ColumnConstants.LOCK_PREFIX) {
- resolveLock(entry);
- continue mloop;
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- stats.incrementEntriesReturned(1);
- return entry;
- } else if (colType == ColumnConstants.RLOCK_PREFIX) {
- return entry;
- } else {
- throw new IllegalArgumentException("Unexpected column type " +
colType);
+ ColumnType colType = ColumnType.from(entry.getKey());
+
+ switch (colType) {
+ case LOCK:
+ resolveLock(entry);
+ continue mloop;
+ case DATA:
+ stats.incrementEntriesReturned(1);
+ return entry;
+ case RLOCK:
+ return entry;
+ default:
+ throw new IllegalArgumentException("Unexpected column type " +
colType);
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index ae87a24..3cbbe5c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -87,8 +88,6 @@ import org.apache.fluo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
-import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
@@ -278,7 +277,7 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
continue;
}
- if ((kve.getKey().getTimestamp() & PREFIX_MASK) == RLOCK_PREFIX) {
+ if (ColumnType.from(kve.getKey()) == ColumnType.RLOCK) {
if (readLockCols == null) {
readLockCols = readLocksSeen.computeIfAbsent(row, k -> new
HashSet<>());
}
@@ -407,14 +406,14 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
}
if (isWrite(val) && !isDelete(val)) {
- cm.put(col, ColumnConstants.DATA_PREFIX | startTs, val.toArray());
+ cm.put(col, ColumnType.DATA.enode(startTs), val.toArray());
}
if (isReadLock(val)) {
- cm.put(col, ColumnConstants.RLOCK_PREFIX |
ReadLockUtil.encodeTs(startTs, false),
+ cm.put(col, ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs,
false)),
ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID()));
} else {
- cm.put(col, ColumnConstants.LOCK_PREFIX | startTs,
LockValue.encode(primaryRow, primaryColumn,
+ cm.put(col, ColumnType.LOCK.enode(startTs), LockValue.encode(primaryRow,
primaryColumn,
isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
}
@@ -668,11 +667,10 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
if (notification.getColumn().equals(col)) {
// check to see if ACK exist after notification
Key startKey = SpanUtil.toKey(notification.getRowColumn());
- startKey.setTimestamp(
- ColumnConstants.ACK_PREFIX | (Long.MAX_VALUE &
ColumnConstants.TIMESTAMP_MASK));
+ startKey.setTimestamp(ColumnType.ACK.first());
Key endKey = SpanUtil.toKey(notification.getRowColumn());
- endKey.setTimestamp(ColumnConstants.ACK_PREFIX |
(notification.getTimestamp() + 1));
+ endKey.setTimestamp(ColumnType.ACK.enode(notification.getTimestamp()
+ 1));
Range range = new Range(startKey, endKey);
@@ -1112,11 +1110,10 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
m = new Flutation(env, row);
for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
if (isReadLock(entry.getValue())) {
- m.put(entry.getKey(),
- ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs,
true),
+ m.put(entry.getKey(),
ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)),
DelReadLockValue.encodeRollback());
} else {
- m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ m.put(entry.getKey(), ColumnType.DEL_LOCK.enode(startTs),
DelLockValue.encodeRollback(false, true));
}
}
@@ -1134,9 +1131,9 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
// mark transaction as complete for garbage collection purposes
Flutation m = new Flutation(env, cd.prow);
- m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ m.put(cd.pcol, ColumnType.DEL_LOCK.enode(startTs),
DelLockValue.encodeRollback(startTs, true, true));
- m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+ m.put(cd.pcol, ColumnType.TX_DONE.enode(startTs), EMPTY);
return Collections.singletonList(m);
}
@@ -1392,7 +1389,7 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
Flutation m = new Flutation(env, cd.prow);
// mark transaction as complete for garbage collection purposes
- m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
+ m.put(cd.pcol, ColumnType.TX_DONE.enode(commitTs), EMPTY);
afterFlushMutations.add(m);
if (weakNotification != null) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
index 849e2b5..2b6126d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -22,6 +22,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.RollbackCheckIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
@@ -51,43 +52,50 @@ public class TxInfo {
return txInfo;
}
- long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(entry.getKey());
long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.LOCK_PREFIX) {
- if (ts == startTs) {
- txInfo.status = TxStatus.LOCKED;
- txInfo.lockValue = entry.getValue().get();
- } else {
- txInfo.status = TxStatus.UNKNOWN; // locked by another tx
+ switch (colType) {
+ case LOCK: {
+ if (ts == startTs) {
+ txInfo.status = TxStatus.LOCKED;
+ txInfo.lockValue = entry.getValue().get();
+ } else {
+ txInfo.status = TxStatus.UNKNOWN; // locked by another tx
+ }
+ break;
}
- } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
- DelLockValue dlv = new DelLockValue(entry.getValue().get());
-
- if (ts != startTs) {
- // expect this to always be false, must be a bug in the iterator
- throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != "
+ startTs + ") ");
+ case DEL_LOCK: {
+ DelLockValue dlv = new DelLockValue(entry.getValue().get());
+
+ if (ts != startTs) {
+ // expect this to always be false, must be a bug in the iterator
+ throw new IllegalStateException(prow + " " + pcol + " (" + ts + " !=
" + startTs + ") ");
+ }
+
+ if (dlv.isRollback()) {
+ txInfo.status = TxStatus.ROLLED_BACK;
+ } else {
+ txInfo.status = TxStatus.COMMITTED;
+ txInfo.commitTs = dlv.getCommitTimestamp();
+ }
+ break;
}
+ case WRITE: {
+ long timePtr = WriteValue.getTimestamp(entry.getValue().get());
- if (dlv.isRollback()) {
- txInfo.status = TxStatus.ROLLED_BACK;
- } else {
- txInfo.status = TxStatus.COMMITTED;
- txInfo.commitTs = dlv.getCommitTimestamp();
- }
- } else if (colType == ColumnConstants.WRITE_PREFIX) {
- long timePtr = WriteValue.getTimestamp(entry.getValue().get());
+ if (timePtr != startTs) {
+ // expect this to always be false, must be a bug in the iterator
+ throw new IllegalStateException(
+ prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
+ }
- if (timePtr != startTs) {
- // expect this to always be false, must be a bug in the iterator
- throw new IllegalStateException(
- prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
+ txInfo.status = TxStatus.COMMITTED;
+ txInfo.commitTs = ts;
+ break;
}
-
- txInfo.status = TxStatus.COMMITTED;
- txInfo.commitTs = ts;
- } else {
- throw new IllegalStateException("unexpected col type returned " +
colType);
+ default:
+ throw new IllegalStateException("unexpected col type returned " +
colType);
}
return txInfo;
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bbf1d83..9ea5130 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
@@ -56,19 +56,18 @@ public class ColumnUtil {
boolean isWrite, boolean isDelete, boolean isReadlock, long startTs,
long commitTs,
Set<Column> observedColumns, Mutation m) {
if (isReadlock) {
- Flutation.put(env, m, col,
- ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+ Flutation.put(env, m, col,
ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)),
DelReadLockValue.encodeCommit(commitTs));
} else if (isWrite) {
- Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs,
+ Flutation.put(env, m, col, ColumnType.WRITE.enode(commitTs),
WriteValue.encode(startTs, isPrimary, isDelete));
} else {
- Flutation.put(env, m, col, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ Flutation.put(env, m, col, ColumnType.DEL_LOCK.enode(startTs),
DelLockValue.encodeCommit(commitTs, isPrimary));
}
if (isTrigger) {
- Flutation.put(env, m, col, ColumnConstants.ACK_PREFIX | startTs,
TransactionImpl.EMPTY);
+ Flutation.put(env, m, col, ColumnType.ACK.enode(startTs),
TransactionImpl.EMPTY);
}
}
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
index 546cf35..3b475bf 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
@@ -638,12 +639,12 @@ public class FailureIT extends ITBaseImpl {
Scanner scanner = aClient.createScanner(getCurTableName(),
Authorizations.EMPTY);
for (Entry<Key, Value> entry : scanner) {
- long colType = entry.getKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(entry.getKey());
long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
String row = entry.getKey().getRowData().toString();
byte[] val = entry.getValue().get();
- if (row.equals(rolledBackRow) && colType ==
ColumnConstants.DEL_LOCK_PREFIX && ts == startTs
+ if (row.equals(rolledBackRow) && colType == ColumnType.DEL_LOCK && ts ==
startTs
&& DelLockValue.isPrimary(val)) {
sawExpected = true;
}
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index b99b16d..03ff986 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.TransactionBase;
@@ -308,10 +309,10 @@ public class GarbageCollectionIteratorIT extends
ITBaseImpl {
numWrites = 0;
}
- long colType = entry.getKey().getTimestamp() &
ColumnConstants.PREFIX_MASK;
+ ColumnType colType = ColumnType.from(entry.getKey());
long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
- if (colType == ColumnConstants.WRITE_PREFIX) {
+ if (colType == ColumnType.WRITE) {
numWrites++;
if (numWrites > 1) {
Assert.assertTrue("Extra write had ts " + ts + " < " + oldestTs, ts
>= oldestTs);
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
index 1e959ff..3fa156c 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -19,7 +19,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -45,14 +45,14 @@ import org.apache.hadoop.io.Text;
* FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
* // could also reuse column objects.
* Column column = new Column("fam1", "fam2");
- *
+ *
* fkvg.setRow("row1").setColumn(column).setValue("val2");
- *
+ *
* for (FluoKeyValue fluoKeyValue : fkvg.getKeyValues())
* writeToAccumuloFile(fluoKeyValue);
- *
+ *
* fkvg.setRow("row2").setColumn(column).setValue("val3");
- *
+ *
* // Each call to getKeyValues() returns the same objects populated with
different data when
* // possible. So subsequent calls to getKeyValues() will create less
objects. Of course this
* // invalidates what was returned by previous calls to getKeyValues().
@@ -187,11 +187,11 @@ public class FluoKeyValueGenerator {
*/
public FluoKeyValue[] getKeyValues() {
FluoKeyValue kv = keyVals[0];
- kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.WRITE_PREFIX | 1));
+ kv.setKey(new Key(row, fam, qual, vis, ColumnType.WRITE.enode(1)));
kv.getValue().set(WriteValue.encode(0, false, false));
kv = keyVals[1];
- kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.DATA_PREFIX | 0));
+ kv.setKey(new Key(row, fam, qual, vis, ColumnType.DATA.enode(0)));
kv.getValue().set(val);
return keyVals;
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
index dd813ab..e09f20e 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -83,9 +83,8 @@ public class FluoMutationGenerator {
}
public FluoMutationGenerator put(Column col, byte[] value) {
- Flutation.put(mutation, col, ColumnConstants.DATA_PREFIX | 0, value);
- Flutation.put(mutation, col, ColumnConstants.WRITE_PREFIX | 1,
- WriteValue.encode(0, false, false));
+ Flutation.put(mutation, col, ColumnType.DATA.enode(0), value);
+ Flutation.put(mutation, col, ColumnType.WRITE.enode(1),
WriteValue.encode(0, false, false));
return this;
}