http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java index 94f43dc,0000000..48ec06a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@@ -1,814 -1,0 +1,827 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.filter; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.transform.BasePartitions; +import org.apache.cassandra.db.transform.BaseRows; +import org.apache.cassandra.db.transform.StoppingTransformation; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Object in charge of tracking if we have fetch enough data for a given query. + * + * The reason this is not just a simple integer is that Thrift and CQL3 count + * stuffs in different ways. This is what abstract those differences. + */ +public abstract class DataLimits +{ + public static final Serializer serializer = new Serializer(); + + public static final int NO_LIMIT = Integer.MAX_VALUE; + + public static final DataLimits NONE = new CQLLimits(NO_LIMIT) + { + @Override - public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) ++ public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData) + { + return false; + } + + @Override - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) ++ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, ++ int nowInSec, ++ boolean countPartitionsWithOnlyStaticData) + { + return iter; + } + + @Override - public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec) ++ public UnfilteredRowIterator filter(UnfilteredRowIterator iter, ++ int nowInSec, ++ boolean countPartitionsWithOnlyStaticData) + { + return iter; + } + }; + + // We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per + // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering. + public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true); + + public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT } + + public static DataLimits cqlLimits(int cqlRowLimit) + { + return new CQLLimits(cqlRowLimit); + } + + public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit) + { + return new CQLLimits(cqlRowLimit, perPartitionLimit); + } + + public static DataLimits distinctLimits(int cqlRowLimit) + { + return CQLLimits.distinct(cqlRowLimit); + } + + public static DataLimits thriftLimits(int partitionLimit, int cellPerPartitionLimit) + { + return new ThriftLimits(partitionLimit, cellPerPartitionLimit); + } + + public static DataLimits superColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit) + { + return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); + } + + public abstract Kind kind(); + + public abstract boolean isUnlimited(); + public abstract boolean isDistinct(); + + public abstract DataLimits forPaging(int pageSize); + public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining); + + public abstract DataLimits forShortReadRetry(int toFetch); + - public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec); ++ public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData); + + /** + * Returns a new {@code Counter} for this limits. + * + * @param nowInSec the current time in second (to decide what is expired or not). + * @param assumeLiveData if true, the counter will assume that every row passed is live and won't + * thus check for liveness, otherwise it will. This should be {@code true} when used on a + * {@code RowIterator} (since it only returns live rows), false otherwise. ++ * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted ++ * as 1 valid row. + * @return a new {@code Counter} for this limits. + */ - public abstract Counter newCounter(int nowInSec, boolean assumeLiveData); ++ public abstract Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData); + + /** + * The max number of results this limits enforces. + * <p> + * Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for + * thrift, it means cells. + * + * @return the maximum number of results this limits enforces. + */ + public abstract int count(); + + public abstract int perPartitionCount(); + - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) ++ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, ++ int nowInSec, ++ boolean countPartitionsWithOnlyStaticData) + { - return this.newCounter(nowInSec, false).applyTo(iter); ++ return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter); + } + - public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec) ++ public UnfilteredRowIterator filter(UnfilteredRowIterator iter, ++ int nowInSec, ++ boolean countPartitionsWithOnlyStaticData) + { - return this.newCounter(nowInSec, false).applyTo(iter); ++ return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter); + } + - public PartitionIterator filter(PartitionIterator iter, int nowInSec) ++ public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) + { - return this.newCounter(nowInSec, true).applyTo(iter); ++ return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData).applyTo(iter); + } + + /** + * Estimate the number of results (the definition of "results" will be rows for CQL queries + * and partitions for thrift ones) that a full scan of the provided cfs would yield. + */ + public abstract float estimateTotalResults(ColumnFamilyStore cfs); + + public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>> + { + // false means we do not propagate our stop signals onto the iterator, we only count + private boolean enforceLimits = true; + + public Counter onlyCount() + { + this.enforceLimits = false; + return this; + } + + public PartitionIterator applyTo(PartitionIterator partitions) + { + return Transformation.apply(partitions, this); + } + + public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions) + { + return Transformation.apply(partitions, this); + } + + public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition) + { + return (UnfilteredRowIterator) applyToPartition(partition); + } + + public RowIterator applyTo(RowIterator partition) + { + return (RowIterator) applyToPartition(partition); + } + + /** + * The number of results counted. + * <p> + * Note that the definition of "results" should be the same that for {@link #count}. + * + * @return the number of results counted. + */ + public abstract int counted(); + public abstract int countedInCurrentPartition(); + + public abstract boolean isDone(); + public abstract boolean isDoneForPartition(); + + @Override + protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition) + { + return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this) + : Transformation.apply((RowIterator) partition, this); + } + + // called before we process a given partition + protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow); + + @Override + protected void attachTo(BasePartitions partitions) + { + if (enforceLimits) + super.attachTo(partitions); + if (isDone()) + stop(); + } + + @Override + protected void attachTo(BaseRows rows) + { + if (enforceLimits) + super.attachTo(rows); + applyToPartition(rows.partitionKey(), rows.staticRow()); + if (isDoneForPartition()) + stopInPartition(); + } + } + + /** + * Limits used by CQL; this counts rows. + */ + private static class CQLLimits extends DataLimits + { + protected final int rowLimit; + protected final int perPartitionLimit; + + // Whether the query is a distinct query or not. + protected final boolean isDistinct; + + private CQLLimits(int rowLimit) + { + this(rowLimit, NO_LIMIT); + } + + private CQLLimits(int rowLimit, int perPartitionLimit) + { + this(rowLimit, perPartitionLimit, false); + } + + private CQLLimits(int rowLimit, int perPartitionLimit, boolean isDistinct) + { + this.rowLimit = rowLimit; + this.perPartitionLimit = perPartitionLimit; + this.isDistinct = isDistinct; + } + + private static CQLLimits distinct(int rowLimit) + { + return new CQLLimits(rowLimit, 1, true); + } + + public Kind kind() + { + return Kind.CQL_LIMIT; + } + + public boolean isUnlimited() + { + return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT; + } + + public boolean isDistinct() + { + return isDistinct; + } + + public DataLimits forPaging(int pageSize) + { + return new CQLLimits(pageSize, perPartitionLimit, isDistinct); + } + + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + return new CQLPagingLimits(pageSize, perPartitionLimit, isDistinct, lastReturnedKey, lastReturnedKeyRemaining); + } + + public DataLimits forShortReadRetry(int toFetch) + { + // When we do a short read retry, we're only ever querying the single partition on which we have a short read. So + // we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch + // for both argument or just for perPartitionLimit with no limit on rowLimit). + return new CQLLimits(toFetch, NO_LIMIT, isDistinct); + } + - public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) ++ public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData) + { + // We want the number of row that are currently live. Getting that precise number forces + // us to iterate the cached partition in general, but we can avoid that if: + // - The number of rows with at least one non-expiring cell is greater than what we ask, + // in which case we know we have enough live. + // - The number of rows is less than requested, in which case we know we won't have enough. + if (cached.rowsWithNonExpiringCells() >= rowLimit) + return true; + + if (cached.rowCount() < rowLimit) + return false; + + // Otherwise, we need to re-count + - DataLimits.Counter counter = newCounter(nowInSec, false); ++ DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData); + try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); + UnfilteredRowIterator iter = counter.applyTo(cacheIter)) + { + // Consume the iterator until we've counted enough + while (iter.hasNext()) + iter.next(); + return counter.isDone(); + } + } + - public Counter newCounter(int nowInSec, boolean assumeLiveData) ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { - return new CQLCounter(nowInSec, assumeLiveData); ++ return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + } + + public int count() + { + return rowLimit; + } + + public int perPartitionCount() + { + return perPartitionLimit; + } + + public float estimateTotalResults(ColumnFamilyStore cfs) + { + // TODO: we should start storing stats on the number of rows (instead of the number of cells, which + // is what getMeanColumns returns) + float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size(); + return rowsPerPartition * (cfs.estimateKeys()); + } + + protected class CQLCounter extends Counter + { + protected final int nowInSec; + protected final boolean assumeLiveData; ++ protected final boolean countPartitionsWithOnlyStaticData; + + protected int rowCounted; + protected int rowInCurrentPartition; + + protected boolean hasLiveStaticRow; + - public CQLCounter(int nowInSec, boolean assumeLiveData) ++ public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { + this.nowInSec = nowInSec; + this.assumeLiveData = assumeLiveData; ++ this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + rowInCurrentPartition = 0; + hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec)); + } + + @Override + public Row applyToRow(Row row) + { + if (assumeLiveData || row.hasLiveData(nowInSec)) + incrementRowCount(); + return row; + } + + @Override + public void onPartitionClose() + { + // Normally, we don't count static rows as from a CQL point of view, it will be merge with other + // rows in the partition. However, if we only have the static row, it will be returned as one row + // so count it. - if (hasLiveStaticRow && rowInCurrentPartition == 0) ++ if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowInCurrentPartition == 0) + incrementRowCount(); + super.onPartitionClose(); + } + + private void incrementRowCount() + { + if (++rowCounted >= rowLimit) + stop(); + if (++rowInCurrentPartition >= perPartitionLimit) + stopInPartition(); + } + + public int counted() + { + return rowCounted; + } + + public int countedInCurrentPartition() + { + return rowInCurrentPartition; + } + + public boolean isDone() + { + return rowCounted >= rowLimit; + } + + public boolean isDoneForPartition() + { + return isDone() || rowInCurrentPartition >= perPartitionLimit; + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + + if (rowLimit != NO_LIMIT) + { + sb.append("LIMIT ").append(rowLimit); + if (perPartitionLimit != NO_LIMIT) + sb.append(' '); + } + + if (perPartitionLimit != NO_LIMIT) + sb.append("PER PARTITION LIMIT ").append(perPartitionLimit); + + return sb.toString(); + } + } + + private static class CQLPagingLimits extends CQLLimits + { + private final ByteBuffer lastReturnedKey; + private final int lastReturnedKeyRemaining; + + public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + super(rowLimit, perPartitionLimit, isDistinct); + this.lastReturnedKey = lastReturnedKey; + this.lastReturnedKeyRemaining = lastReturnedKeyRemaining; + } + + @Override + public Kind kind() + { + return Kind.CQL_PAGING_LIMIT; + } + + @Override + public DataLimits forPaging(int pageSize) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + throw new UnsupportedOperationException(); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData) ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { - return new PagingAwareCounter(nowInSec, assumeLiveData); ++ return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + } + + private class PagingAwareCounter extends CQLCounter + { - private PagingAwareCounter(int nowInSec, boolean assumeLiveData) ++ private PagingAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { - super(nowInSec, assumeLiveData); ++ super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + if (partitionKey.getKey().equals(lastReturnedKey)) + { + rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining; + // lastReturnedKey is the last key for which we're returned rows in the first page. + // So, since we know we have returned rows, we know we have accounted for the static row + // if any already, so force hasLiveStaticRow to false so we make sure to not count it + // once more. + hasLiveStaticRow = false; + } + else + { + super.applyToPartition(partitionKey, staticRow); + } + } + } + } + + /** + * Limits used by thrift; this count partition and cells. + */ + private static class ThriftLimits extends DataLimits + { + protected final int partitionLimit; + protected final int cellPerPartitionLimit; + + private ThriftLimits(int partitionLimit, int cellPerPartitionLimit) + { + this.partitionLimit = partitionLimit; + this.cellPerPartitionLimit = cellPerPartitionLimit; + } + + public Kind kind() + { + return Kind.THRIFT_LIMIT; + } + + public boolean isUnlimited() + { + return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT; + } + + public boolean isDistinct() + { + return false; + } + + public DataLimits forPaging(int pageSize) + { + // We don't support paging on thrift in general but do use paging under the hood for get_count. For + // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single + // partition). We do check that the partition limit is 1 however to make sure this is not misused + // (as this wouldn't work properly for range queries). + assert partitionLimit == 1; + return new ThriftLimits(partitionLimit, pageSize); + } + + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + throw new UnsupportedOperationException(); + } + + public DataLimits forShortReadRetry(int toFetch) + { + // Short read retries are always done for a single partition at a time, so it's ok to ignore the + // partition limit for those + return new ThriftLimits(1, toFetch); + } + - public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) ++ public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData) + { + // We want the number of cells that are currently live. Getting that precise number forces + // us to iterate the cached partition in general, but we can avoid that if: + // - The number of non-expiring live cells is greater than the number of cells asked (we then + // know we have enough live cells). + // - The number of cells cached is less than requested, in which case we know we won't have enough. + if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit) + return true; + + if (cached.nonTombstoneCellCount() < cellPerPartitionLimit) + return false; + + // Otherwise, we need to re-count - DataLimits.Counter counter = newCounter(nowInSec, false); ++ DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData); + try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); + UnfilteredRowIterator iter = counter.applyTo(cacheIter)) + { + // Consume the iterator until we've counted enough + while (iter.hasNext()) + iter.next(); + return counter.isDone(); + } + } + - public Counter newCounter(int nowInSec, boolean assumeLiveData) ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { + return new ThriftCounter(nowInSec, assumeLiveData); + } + + public int count() + { + return partitionLimit * cellPerPartitionLimit; + } + + public int perPartitionCount() + { + return cellPerPartitionLimit; + } + + public float estimateTotalResults(ColumnFamilyStore cfs) + { + // remember that getMeansColumns returns a number of cells: we should clean nomenclature + float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size(); + return cellsPerPartition * cfs.estimateKeys(); + } + + protected class ThriftCounter extends Counter + { + protected final int nowInSec; + protected final boolean assumeLiveData; + + protected int partitionsCounted; + protected int cellsCounted; + protected int cellsInCurrentPartition; + + public ThriftCounter(int nowInSec, boolean assumeLiveData) + { + this.nowInSec = nowInSec; + this.assumeLiveData = assumeLiveData; + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + cellsInCurrentPartition = 0; + if (!staticRow.isEmpty()) + applyToRow(staticRow); + } + + @Override + public Row applyToRow(Row row) + { + for (Cell cell : row.cells()) + { + if (assumeLiveData || cell.isLive(nowInSec)) + { + ++cellsCounted; + if (++cellsInCurrentPartition >= cellPerPartitionLimit) + stopInPartition(); + } + } + return row; + } + + @Override + public void onPartitionClose() + { + if (++partitionsCounted >= partitionLimit) + stop(); + super.onPartitionClose(); + } + + public int counted() + { + return cellsCounted; + } + + public int countedInCurrentPartition() + { + return cellsInCurrentPartition; + } + + public boolean isDone() + { + return partitionsCounted >= partitionLimit; + } + + public boolean isDoneForPartition() + { + return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit; + } + } + + @Override + public String toString() + { + // This is not valid CQL, but that's ok since it's not used for CQL queries. + return String.format("THRIFT LIMIT (partitions=%d, cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit); + } + } + + /** + * Limits used for thrift get_count when we only want to count super columns. + */ + private static class SuperColumnCountingLimits extends ThriftLimits + { + private SuperColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit) + { + super(partitionLimit, cellPerPartitionLimit); + } + + public Kind kind() + { + return Kind.SUPER_COLUMN_COUNTING_LIMIT; + } + + public DataLimits forPaging(int pageSize) + { + // We don't support paging on thrift in general but do use paging under the hood for get_count. For + // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single + // partition). We do check that the partition limit is 1 however to make sure this is not misused + // (as this wouldn't work properly for range queries). + assert partitionLimit == 1; + return new SuperColumnCountingLimits(partitionLimit, pageSize); + } + + public DataLimits forShortReadRetry(int toFetch) + { + // Short read retries are always done for a single partition at a time, so it's ok to ignore the + // partition limit for those + return new SuperColumnCountingLimits(1, toFetch); + } + - public Counter newCounter(int nowInSec, boolean assumeLiveData) ++ @Override ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { + return new SuperColumnCountingCounter(nowInSec, assumeLiveData); + } + + protected class SuperColumnCountingCounter extends ThriftCounter + { + public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData) + { + super(nowInSec, assumeLiveData); + } + + @Override + public Row applyToRow(Row row) + { + // In the internal format, a row == a super column, so that's what we want to count. + if (assumeLiveData || row.hasLiveData(nowInSec)) + { + ++cellsCounted; + if (++cellsInCurrentPartition >= cellPerPartitionLimit) + stopInPartition(); + } + return row; + } + } + } + + public static class Serializer + { + public void serialize(DataLimits limits, DataOutputPlus out, int version) throws IOException + { + out.writeByte(limits.kind().ordinal()); + switch (limits.kind()) + { + case CQL_LIMIT: + case CQL_PAGING_LIMIT: + CQLLimits cqlLimits = (CQLLimits)limits; + out.writeUnsignedVInt(cqlLimits.rowLimit); + out.writeUnsignedVInt(cqlLimits.perPartitionLimit); + out.writeBoolean(cqlLimits.isDistinct); + if (limits.kind() == Kind.CQL_PAGING_LIMIT) + { + CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; + ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out); + out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining); + } + break; + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + ThriftLimits thriftLimits = (ThriftLimits)limits; + out.writeUnsignedVInt(thriftLimits.partitionLimit); + out.writeUnsignedVInt(thriftLimits.cellPerPartitionLimit); + break; + } + } + + public DataLimits deserialize(DataInputPlus in, int version) throws IOException + { + Kind kind = Kind.values()[in.readUnsignedByte()]; + switch (kind) + { + case CQL_LIMIT: + case CQL_PAGING_LIMIT: + int rowLimit = (int)in.readUnsignedVInt(); + int perPartitionLimit = (int)in.readUnsignedVInt(); + boolean isDistinct = in.readBoolean(); + if (kind == Kind.CQL_LIMIT) + return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); + + ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in); + int lastRemaining = (int)in.readUnsignedVInt(); + return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining); + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + int partitionLimit = (int)in.readUnsignedVInt(); + int cellPerPartitionLimit = (int)in.readUnsignedVInt(); + return kind == Kind.THRIFT_LIMIT + ? new ThriftLimits(partitionLimit, cellPerPartitionLimit) + : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); + } + throw new AssertionError(); + } + + public long serializedSize(DataLimits limits, int version) + { + long size = TypeSizes.sizeof((byte)limits.kind().ordinal()); + switch (limits.kind()) + { + case CQL_LIMIT: + case CQL_PAGING_LIMIT: + CQLLimits cqlLimits = (CQLLimits)limits; + size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit); + size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit); + size += TypeSizes.sizeof(cqlLimits.isDistinct); + if (limits.kind() == Kind.CQL_PAGING_LIMIT) + { + CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; + size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey); + size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining); + } + break; + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + ThriftLimits thriftLimits = (ThriftLimits)limits; + size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit); + size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit); + break; + default: + throw new AssertionError(); + } + return size; + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/filter/RowFilter.java index 5ffe2ab,0000000..8d11038 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@@ -1,994 -1,0 +1,1009 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.filter; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.base.Objects; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.context.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; + +/** + * A filter on which rows a given query should include or exclude. + * <p> + * This corresponds to the restrictions on rows that are not handled by the query + * {@link ClusteringIndexFilter}. Some of the expressions of this filter may + * be handled by a 2ndary index, and the rest is simply filtered out from the + * result set (the later can only happen if the query was using ALLOW FILTERING). + */ +public abstract class RowFilter implements Iterable<RowFilter.Expression> +{ + public static final Serializer serializer = new Serializer(); + public static final RowFilter NONE = new CQLFilter(Collections.emptyList()); + + protected final List<Expression> expressions; + + protected RowFilter(List<Expression> expressions) + { + this.expressions = expressions; + } + + public static RowFilter create() + { + return new CQLFilter(new ArrayList<>()); + } + + public static RowFilter create(int capacity) + { + return new CQLFilter(new ArrayList<>(capacity)); + } + + public static RowFilter forThrift(int capacity) + { + return new ThriftFilter(new ArrayList<>(capacity)); + } + + public void add(ColumnDefinition def, Operator op, ByteBuffer value) + { + add(new SimpleExpression(def, op, value)); + } + + public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator op, ByteBuffer value) + { + add(new MapEqualityExpression(def, key, op, value)); + } + + public void addThriftExpression(CFMetaData metadata, ByteBuffer name, Operator op, ByteBuffer value) + { + assert (this instanceof ThriftFilter); + add(new ThriftExpression(metadata, name, op, value)); + } + + public void addCustomIndexExpression(CFMetaData cfm, IndexMetadata targetIndex, ByteBuffer value) + { + add(new CustomExpression(cfm, targetIndex, value)); + } + + private void add(Expression expression) + { + expression.validate(); + expressions.add(expression); + } + + public List<Expression> getExpressions() + { + return expressions; + } + + /** ++ * Checks if some of the expressions apply to clustering or regular columns. ++ * @return {@code true} if some of the expressions apply to clustering or regular columns, {@code false} otherwise. ++ */ ++ public boolean hasExpressionOnClusteringOrRegularColumns() ++ { ++ for (Expression expression : expressions) ++ { ++ ColumnDefinition column = expression.column(); ++ if (column.isClusteringColumn() || column.isRegular()) ++ return true; ++ } ++ return false; ++ } ++ ++ /** + * Filters the provided iterator so that only the row satisfying the expression of this filter + * are included in the resulting iterator. + * + * @param iter the iterator to filter + * @param nowInSec the time of query in seconds. + * @return the filtered iterator. + */ + public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec); + + /** + * Whether the provided row in the provided partition satisfies this filter. + * + * @param metadata the table metadata. + * @param partitionKey the partition key for partition to test. + * @param row the row to test. + * @param nowInSec the current time in seconds (to know what is live and what isn't). + * @return {@code true} if {@code row} in partition {@code partitionKey} satisfies this row filter. + */ + public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row, int nowInSec) + { + // We purge all tombstones as the expressions isSatisfiedBy methods expects it + Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + if (purged == null) + return expressions.isEmpty(); + + for (Expression e : expressions) + { + if (!e.isSatisfiedBy(metadata, partitionKey, purged)) + return false; + } + return true; + } + + /** + * Returns true if all of the expressions within this filter that apply to the partition key are satisfied by + * the given key, false otherwise. + */ + public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, AbstractType<?> keyValidator) + { + for (Expression e : expressions) + { + if (!e.column.isPartitionKey()) + continue; + + ByteBuffer value = keyValidator instanceof CompositeType + ? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()] + : key.getKey(); + if (!e.operator().isSatisfiedBy(e.column.type, value, e.value)) + return false; + } + return true; + } + + /** + * Returns true if all of the expressions within this filter that apply to the clustering key are satisfied by + * the given Clustering, false otherwise. + */ + public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering clustering) + { + for (Expression e : expressions) + { + if (!e.column.isClusteringColumn()) + continue; + + if (!e.operator().isSatisfiedBy(e.column.type, clustering.get(e.column.position()), e.value)) + { + return false; + } + } + return true; + } + + /** + * Returns this filter but without the provided expression. This method + * *assumes* that the filter contains the provided expression. + */ + public RowFilter without(Expression expression) + { + assert expressions.contains(expression); + if (expressions.size() == 1) + return RowFilter.NONE; + + List<Expression> newExpressions = new ArrayList<>(expressions.size() - 1); + for (Expression e : expressions) + if (!e.equals(expression)) + newExpressions.add(e); + + return withNewExpressions(newExpressions); + } + + protected abstract RowFilter withNewExpressions(List<Expression> expressions); + + public boolean isEmpty() + { + return expressions.isEmpty(); + } + + public Iterator<Expression> iterator() + { + return expressions.iterator(); + } + + private static Clustering makeCompactClustering(CFMetaData metadata, ByteBuffer name) + { + assert metadata.isCompactTable(); + if (metadata.isCompound()) + { + List<ByteBuffer> values = CompositeType.splitName(name); + return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()])); + } + else + { + return new Clustering(name); + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < expressions.size(); i++) + { + if (i > 0) + sb.append(" AND "); + sb.append(expressions.get(i)); + } + return sb.toString(); + } + + private static class CQLFilter extends RowFilter + { + private CQLFilter(List<Expression> expressions) + { + super(expressions); + } + + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) + { + if (expressions.isEmpty()) + return iter; + + final CFMetaData metadata = iter.metadata(); + long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count(); + final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0; + final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0; + + class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator> + { + DecoratedKey pk; + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + // The filter might be on static columns, so need to check static row first. + if (filterStaticColumns && applyToRow(partition.staticRow()) == null) + return null; + + pk = partition.partitionKey(); + UnfilteredRowIterator iterator = Transformation.apply(partition, this); + + return (filterNonStaticColumns && !iterator.hasNext()) ? null : iterator; + } + + public Row applyToRow(Row row) + { + Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + if (purged == null) + return null; + + for (Expression e : expressions) + if (!e.isSatisfiedBy(metadata, pk, purged)) + return null; + return row; + } + } + + return Transformation.apply(iter, new IsSatisfiedFilter()); + } + + protected RowFilter withNewExpressions(List<Expression> expressions) + { + return new CQLFilter(expressions); + } + } + + private static class ThriftFilter extends RowFilter + { + private ThriftFilter(List<Expression> expressions) + { + super(expressions); + } + + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec) + { + if (expressions.isEmpty()) + return iter; + + class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator> + { + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + // Thrift does not filter rows, it filters entire partition if any of the expression is not + // satisfied, which forces us to materialize the result (in theory we could materialize only + // what we need which might or might not be everything, but we keep it simple since in practice + // it's not worth that it has ever been). + ImmutableBTreePartition result = ImmutableBTreePartition.create(iter); + iter.close(); + + // The partition needs to have a row for every expression, and the expression needs to be valid. + for (Expression expr : expressions) + { + assert expr instanceof ThriftExpression; + Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes)); + if (row == null || !expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row)) + return null; + } + // If we get there, it means all expressions where satisfied, so return the original result + return result.unfilteredIterator(); + } + } + return Transformation.apply(iter, new IsSatisfiedThriftFilter()); + } + + protected RowFilter withNewExpressions(List<Expression> expressions) + { + return new ThriftFilter(expressions); + } + } + + public static abstract class Expression + { + private static final Serializer serializer = new Serializer(); + + // Note: the order of this enum matter, it's used for serialization + protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM } + + abstract Kind kind(); + protected final ColumnDefinition column; + protected final Operator operator; + protected final ByteBuffer value; + + protected Expression(ColumnDefinition column, Operator operator, ByteBuffer value) + { + this.column = column; + this.operator = operator; + this.value = value; + } + + public boolean isCustom() + { + return kind() == Kind.CUSTOM; + } + + public ColumnDefinition column() + { + return column; + } + + public Operator operator() + { + return operator; + } + + /** + * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator. + * + * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> + * operator, <code>false</code> otherwise. + */ + public boolean isContains() + { + return Operator.CONTAINS == operator; + } + + /** + * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator. + * + * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> + * operator, <code>false</code> otherwise. + */ + public boolean isContainsKey() + { + return Operator.CONTAINS_KEY == operator; + } + + /** + * If this expression is used to query an index, the value to use as + * partition key for that index query. + */ + public ByteBuffer getIndexValue() + { + return value; + } + + public void validate() + { + checkNotNull(value, "Unsupported null value for column %s", column.name); + checkBindValueSet(value, "Unsupported unset value for column %s", column.name); + } + + @Deprecated + public void validateForIndexing() + { + checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT, + "Index expression values may not be larger than 64K"); + } + + /** + * Returns whether the provided row satisfied this expression or not. + * + * @param partitionKey the partition key for row to check. + * @param row the row to check. It should *not* contain deleted cells + * (i.e. it should come from a RowIterator). + * @return whether the row is satisfied by this expression. + */ + public abstract boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row); + + protected ByteBuffer getValue(CFMetaData metadata, DecoratedKey partitionKey, Row row) + { + switch (column.kind) + { + case PARTITION_KEY: + return metadata.getKeyValidator() instanceof CompositeType + ? CompositeType.extractComponent(partitionKey.getKey(), column.position()) + : partitionKey.getKey(); + case CLUSTERING: + return row.clustering().get(column.position()); + default: + Cell cell = row.getCell(column); + return cell == null ? null : cell.value(); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof Expression)) + return false; + + Expression that = (Expression)o; + + return Objects.equal(this.kind(), that.kind()) + && Objects.equal(this.column.name, that.column.name) + && Objects.equal(this.operator, that.operator) + && Objects.equal(this.value, that.value); + } + + @Override + public int hashCode() + { + return Objects.hashCode(column.name, operator, value); + } + + private static class Serializer + { + public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException + { + if (version >= MessagingService.VERSION_30) + out.writeByte(expression.kind().ordinal()); + + // Custom expressions include neither a column or operator, but all + // other expressions do. Also, custom expressions are 3.0+ only, so + // the column & operator will always be the first things written for + // any pre-3.0 version + if (expression.kind() == Kind.CUSTOM) + { + assert version >= MessagingService.VERSION_30; + IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, out, version); + ByteBufferUtil.writeWithShortLength(expression.value, out); + return; + } + + ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out); + expression.operator.writeTo(out); + + switch (expression.kind()) + { + case SIMPLE: + ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out); + break; + case MAP_EQUALITY: + MapEqualityExpression mexpr = (MapEqualityExpression)expression; + if (version < MessagingService.VERSION_30) + { + ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out); + } + else + { + ByteBufferUtil.writeWithShortLength(mexpr.key, out); + ByteBufferUtil.writeWithShortLength(mexpr.value, out); + } + break; + case THRIFT_DYN_EXPR: + ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out); + break; + } + } + + public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException + { + Kind kind = null; + ByteBuffer name; + Operator operator; + ColumnDefinition column; + + if (version >= MessagingService.VERSION_30) + { + kind = Kind.values()[in.readByte()]; + // custom expressions (3.0+ only) do not contain a column or operator, only a value + if (kind == Kind.CUSTOM) + { + return new CustomExpression(metadata, + IndexMetadata.serializer.deserialize(in, version, metadata), + ByteBufferUtil.readWithShortLength(in)); + } + } + + name = ByteBufferUtil.readWithShortLength(in); + operator = Operator.readFrom(in); + column = metadata.getColumnDefinition(name); + if (!metadata.isCompactTable() && column == null) + throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization"); + + if (version < MessagingService.VERSION_30) + { + if (column == null) + kind = Kind.THRIFT_DYN_EXPR; + else if (column.type instanceof MapType && operator == Operator.EQ) + kind = Kind.MAP_EQUALITY; + else + kind = Kind.SIMPLE; + } + + assert kind != null; + switch (kind) + { + case SIMPLE: + return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in)); + case MAP_EQUALITY: + ByteBuffer key, value; + if (version < MessagingService.VERSION_30) + { + ByteBuffer composite = ByteBufferUtil.readWithShortLength(in); + key = CompositeType.extractComponent(composite, 0); + value = CompositeType.extractComponent(composite, 0); + } + else + { + key = ByteBufferUtil.readWithShortLength(in); + value = ByteBufferUtil.readWithShortLength(in); + } + return new MapEqualityExpression(column, key, operator, value); + case THRIFT_DYN_EXPR: + return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in)); + } + throw new AssertionError(); + } + + + public long serializedSize(Expression expression, int version) + { + // version 3.0+ includes a byte for Kind + long size = version >= MessagingService.VERSION_30 ? 1 : 0; + + // custom expressions don't include a column or operator, all other expressions do + if (expression.kind() != Kind.CUSTOM) + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes) + + expression.operator.serializedSize(); + + switch (expression.kind()) + { + case SIMPLE: + size += ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value); + break; + case MAP_EQUALITY: + MapEqualityExpression mexpr = (MapEqualityExpression)expression; + if (version < MessagingService.VERSION_30) + size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue()); + else + size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key) + + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value); + break; + case THRIFT_DYN_EXPR: + size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value); + break; + case CUSTOM: + if (version >= MessagingService.VERSION_30) + size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version) + + ByteBufferUtil.serializedSizeWithShortLength(expression.value); + break; + } + return size; + } + } + } + + /** + * An expression of the form 'column' 'op' 'value'. + */ + private static class SimpleExpression extends Expression + { + public SimpleExpression(ColumnDefinition column, Operator operator, ByteBuffer value) + { + super(column, operator, value); + } + + public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row) + { + // We support null conditions for LWT (in ColumnCondition) but not for RowFilter. + // TODO: we should try to merge both code someday. + assert value != null; + + if (row.isStatic() != column.isStatic()) + return true; + + switch (operator) + { + case EQ: + case LT: + case LTE: + case GTE: + case GT: + { + assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types"; + + // In order to support operators on Counter types, their value has to be extracted from internal + // representation. See CASSANDRA-11629 + if (column.type.isCounter()) + { + ByteBuffer foundValue = getValue(metadata, partitionKey, row); + if (foundValue == null) + return false; + + ByteBuffer counterValue = LongType.instance.decompose(CounterContext.instance().total(foundValue)); + return operator.isSatisfiedBy(LongType.instance, counterValue, value); + } + else + { + // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left. + ByteBuffer foundValue = getValue(metadata, partitionKey, row); + return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value); + } + } + case NEQ: + { + assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types"; + ByteBuffer foundValue = getValue(metadata, partitionKey, row); + // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left. + return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value); + } + case CONTAINS: + assert column.type.isCollection(); + CollectionType<?> type = (CollectionType<?>)column.type; + if (column.isComplex()) + { + ComplexColumnData complexData = row.getComplexColumnData(column); + if (complexData != null) + { + for (Cell cell : complexData) + { + if (type.kind == CollectionType.Kind.SET) + { + if (type.nameComparator().compare(cell.path().get(0), value) == 0) + return true; + } + else + { + if (type.valueComparator().compare(cell.value(), value) == 0) + return true; + } + } + } + return false; + } + else + { + ByteBuffer foundValue = getValue(metadata, partitionKey, row); + if (foundValue == null) + return false; + + switch (type.kind) + { + case LIST: + ListType<?> listType = (ListType<?>)type; + return listType.compose(foundValue).contains(listType.getElementsType().compose(value)); + case SET: + SetType<?> setType = (SetType<?>)type; + return setType.compose(foundValue).contains(setType.getElementsType().compose(value)); + case MAP: + MapType<?,?> mapType = (MapType<?, ?>)type; + return mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value)); + } + throw new AssertionError(); + } + case CONTAINS_KEY: + assert column.type.isCollection() && column.type instanceof MapType; + MapType<?, ?> mapType = (MapType<?, ?>)column.type; + if (column.isComplex()) + { + return row.getCell(column, CellPath.create(value)) != null; + } + else + { + ByteBuffer foundValue = getValue(metadata, partitionKey, row); + return foundValue != null && mapType.getSerializer().getSerializedValue(foundValue, value, mapType.getKeysType()) != null; + } + + case IN: + // It wouldn't be terribly hard to support this (though doing so would imply supporting + // IN for 2ndary index) but currently we don't. + throw new AssertionError(); + } + throw new AssertionError(); + } + + @Override + public String toString() + { + AbstractType<?> type = column.type; + switch (operator) + { + case CONTAINS: + assert type instanceof CollectionType; + CollectionType<?> ct = (CollectionType<?>)type; + type = ct.kind == CollectionType.Kind.SET ? ct.nameComparator() : ct.valueComparator(); + break; + case CONTAINS_KEY: + assert type instanceof MapType; + type = ((MapType<?, ?>)type).nameComparator(); + break; + case IN: + type = ListType.getInstance(type, false); + break; + default: + break; + } + return String.format("%s %s %s", column.name, operator, type.getString(value)); + } + + @Override + Kind kind() + { + return Kind.SIMPLE; + } + } + + /** + * An expression of the form 'column' ['key'] = 'value' (which is only + * supported when 'column' is a map). + */ + private static class MapEqualityExpression extends Expression + { + private final ByteBuffer key; + + public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, Operator operator, ByteBuffer value) + { + super(column, operator, value); + assert column.type instanceof MapType && operator == Operator.EQ; + this.key = key; + } + + @Override + public void validate() throws InvalidRequestException + { + checkNotNull(key, "Unsupported null map key for column %s", column.name); + checkBindValueSet(key, "Unsupported unset map key for column %s", column.name); + checkNotNull(value, "Unsupported null map value for column %s", column.name); + checkBindValueSet(value, "Unsupported unset map value for column %s", column.name); + } + + @Override + public ByteBuffer getIndexValue() + { + return CompositeType.build(key, value); + } + + public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row) + { + assert key != null; + // We support null conditions for LWT (in ColumnCondition) but not for RowFilter. + // TODO: we should try to merge both code someday. + assert value != null; + + if (row.isStatic() != column.isStatic()) + return true; + + MapType<?, ?> mt = (MapType<?, ?>)column.type; + if (column.isComplex()) + { + Cell cell = row.getCell(column, CellPath.create(key)); + return cell != null && mt.valueComparator().compare(cell.value(), value) == 0; + } + else + { + ByteBuffer serializedMap = getValue(metadata, partitionKey, row); + if (serializedMap == null) + return false; + + ByteBuffer foundValue = mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType()); + return foundValue != null && mt.valueComparator().compare(foundValue, value) == 0; + } + } + + @Override + public String toString() + { + MapType<?, ?> mt = (MapType<?, ?>)column.type; + return String.format("%s[%s] = %s", column.name, mt.nameComparator().getString(key), mt.valueComparator().getString(value)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof MapEqualityExpression)) + return false; + + MapEqualityExpression that = (MapEqualityExpression)o; + + return Objects.equal(this.column.name, that.column.name) + && Objects.equal(this.operator, that.operator) + && Objects.equal(this.key, that.key) + && Objects.equal(this.value, that.value); + } + + @Override + public int hashCode() + { + return Objects.hashCode(column.name, operator, key, value); + } + + @Override + Kind kind() + { + return Kind.MAP_EQUALITY; + } + } + + /** + * An expression of the form 'name' = 'value', but where 'name' is actually the + * clustering value for a compact table. This is only for thrift. + */ + private static class ThriftExpression extends Expression + { + public ThriftExpression(CFMetaData metadata, ByteBuffer name, Operator operator, ByteBuffer value) + { + super(makeDefinition(metadata, name), operator, value); + assert metadata.isCompactTable(); + } + + private static ColumnDefinition makeDefinition(CFMetaData metadata, ByteBuffer name) + { + ColumnDefinition def = metadata.getColumnDefinition(name); + if (def != null) + return def; + + // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate + // this we create a "fake" definition. This is messy but it works so is probably good enough. + return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type); + } + + public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row) + { + assert value != null; + + // On thrift queries, even if the column expression is a "static" one, we'll have convert it as a "dynamic" + // one in ThriftResultsMerger, so we always expect it to be a dynamic one. Further, we expect this is only + // called when the row clustering does match the column (see ThriftFilter above). + assert row.clustering().equals(makeCompactClustering(metadata, column.name.bytes)); + Cell cell = row.getCell(metadata.compactValueColumn()); + return cell != null && operator.isSatisfiedBy(column.type, cell.value(), value); + } + + @Override + public String toString() + { + return String.format("%s %s %s", column.name, operator, column.type.getString(value)); + } + + @Override + Kind kind() + { + return Kind.THRIFT_DYN_EXPR; + } + } + + /** + * A custom index expression for use with 2i implementations which support custom syntax and which are not + * necessarily linked to a single column in the base table. + */ + public static final class CustomExpression extends Expression + { + private final IndexMetadata targetIndex; + private final CFMetaData cfm; + + public CustomExpression(CFMetaData cfm, IndexMetadata targetIndex, ByteBuffer value) + { + // The operator is not relevant, but Expression requires it so for now we just hardcode EQ + super(makeDefinition(cfm, targetIndex), Operator.EQ, value); + this.targetIndex = targetIndex; + this.cfm = cfm; + } + + private static ColumnDefinition makeDefinition(CFMetaData cfm, IndexMetadata index) + { + // Similarly to how we handle non-defined columns in thift, we create a fake column definition to + // represent the target index. This is definitely something that can be improved though. + return ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(index.name.getBytes()), BytesType.instance); + } + + public IndexMetadata getTargetIndex() + { + return targetIndex; + } + + public ByteBuffer getValue() + { + return value; + } + + public String toString() + { + return String.format("expr(%s, %s)", + targetIndex.name, + Keyspace.openAndGetStore(cfm) + .indexManager + .getIndex(targetIndex) + .customExpressionValueType()); + } + + Kind kind() + { + return Kind.CUSTOM; + } + + // Filtering by custom expressions isn't supported yet, so just accept any row + public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row) + { + return true; + } + } + + public static class Serializer + { + public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException + { + out.writeBoolean(filter instanceof ThriftFilter); + out.writeUnsignedVInt(filter.expressions.size()); + for (Expression expr : filter.expressions) + Expression.serializer.serialize(expr, out, version); + + } + + public RowFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException + { + boolean forThrift = in.readBoolean(); + int size = (int)in.readUnsignedVInt(); + List<Expression> expressions = new ArrayList<>(size); + for (int i = 0; i < size; i++) + expressions.add(Expression.serializer.deserialize(in, version, metadata)); + + return forThrift + ? new ThriftFilter(expressions) + : new CQLFilter(expressions); + } + + public long serializedSize(RowFilter filter, int version) + { + long size = 1 // forThrift + + TypeSizes.sizeofUnsignedVInt(filter.expressions.size()); + for (Expression expr : filter.expressions) + size += Expression.serializer.serializedSize(expr, version); + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CacheService.java index c51a5d1,a13a52d..d23bdb0 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@@ -439,13 -414,10 +439,13 @@@ public class CacheService implements Ca { public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception { - DecoratedKey key = cfs.partitioner.decorateKey(buffer); - QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE); - ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); - return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data); + DecoratedKey key = cfs.decorateKey(buffer); + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op)) + { - CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); ++ CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec, true), nowInSec); + return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache); + } } }); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org