http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index 60cfbba,0000000..c96a893 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -1,498 -1,0 +1,498 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.TimeoutException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.net.*; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +public class DataResolver extends ResponseResolver +{ + @VisibleForTesting + final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); + + public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + { + super(keyspace, command, consistency, maxResponseCount); + } + + public PartitionIterator getData() + { + ReadResponse response = responses.iterator().next().payload; + return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); + } + + public PartitionIterator resolve() + { + // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here + // at the beginning of this method), so grab the response count once and use that through the method. + int count = responses.size(); + List<UnfilteredPartitionIterator> iters = new ArrayList<>(count); + InetAddress[] sources = new InetAddress[count]; + for (int i = 0; i < count; i++) + { + MessageIn<ReadResponse> msg = responses.get(i); + iters.add(msg.payload.makeIterator(command)); + sources[i] = msg.from; + } + + // Even though every responses should honor the limit, we might have more than requested post reconciliation, + // so ensure we're respecting the limit. - DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true); ++ DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); + return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter)); + } + + public void compareResponses() + { + // We need to fully consume the results to trigger read repairs if appropriate + try (PartitionIterator iterator = resolve()) + { + PartitionIterators.consume(iterator); + } + } + + private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter) + { + // If we have only one results, there is no read repair to do and we can't get short reads + if (results.size() == 1) + return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec()); + + UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources); + + // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit, + // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother. + if (!command.limits().isUnlimited()) + { + for (int i = 0; i < results.size(); i++) + results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter))); + } + + return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); + } + + private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener + { + private final InetAddress[] sources; + + public RepairMergeListener(InetAddress[] sources) + { + this.sources = sources; + } + + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + return new MergeListener(partitionKey, columns(versions), isReversed(versions)); + } + + private PartitionColumns columns(List<UnfilteredRowIterator> versions) + { + Columns statics = Columns.NONE; + Columns regulars = Columns.NONE; + for (UnfilteredRowIterator iter : versions) + { + if (iter == null) + continue; + + PartitionColumns cols = iter.columns(); + statics = statics.mergeTo(cols.statics); + regulars = regulars.mergeTo(cols.regulars); + } + return new PartitionColumns(statics, regulars); + } + + private boolean isReversed(List<UnfilteredRowIterator> versions) + { + for (UnfilteredRowIterator iter : versions) + { + if (iter == null) + continue; + + // Everything will be in the same order + return iter.isReverseOrder(); + } + + assert false : "Expected at least one iterator"; + return false; + } + + public void close() + { + try + { + FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout()); + } + catch (TimeoutException ex) + { + // We got all responses, but timed out while repairing + int blockFor = consistency.blockFor(keyspace); + if (Tracing.isTracing()) + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + else + logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); + + throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); + } + } + + private class MergeListener implements UnfilteredRowIterators.MergeListener + { + private final DecoratedKey partitionKey; + private final PartitionColumns columns; + private final boolean isReversed; + private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length]; + + private final Row.Builder[] currentRows = new Row.Builder[sources.length]; + private final RowDiffListener diffListener; + + // The partition level deletion for the merge row. + private DeletionTime partitionLevelDeletion; + // When merged has a currently open marker, its time. null otherwise. + private DeletionTime mergedDeletionTime; + // For each source, the time of the current deletion as known by the source. + private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length]; + // For each source, record if there is an open range to send as repair, and from where. + private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length]; + + public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) + { + this.partitionKey = partitionKey; + this.columns = columns; + this.isReversed = isReversed; + + this.diffListener = new RowDiffListener() + { + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); + } + + public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addRowDeletion(merged); + } + + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addComplexDeletion(column, merged); + } + + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addCell(merged); + } + + }; + } + + private PartitionUpdate update(int i) + { + if (repairs[i] == null) + repairs[i] = new PartitionUpdate(command.metadata(), partitionKey, columns, 1); + return repairs[i]; + } + + private Row.Builder currentRow(int i, Clustering clustering) + { + if (currentRows[i] == null) + { + currentRows[i] = BTreeRow.sortedBuilder(); + currentRows[i].newRow(clustering); + } + return currentRows[i]; + } + + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + this.partitionLevelDeletion = mergedDeletion; + for (int i = 0; i < versions.length; i++) + { + if (mergedDeletion.supersedes(versions[i])) + update(i).addPartitionDeletion(mergedDeletion); + } + } + + public void onMergedRows(Row merged, Row[] versions) + { + // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle + // those case directly in their respective methods (in other words, it would be inefficient to send a row + // deletion as repair when we know we've already send a partition level or range tombstone that covers it). + if (merged.isEmpty()) + return; + + Rows.diff(diffListener, merged, versions); + for (int i = 0; i < currentRows.length; i++) + { + if (currentRows[i] != null) + update(i).add(currentRows[i].build()); + } + Arrays.fill(currentRows, null); + } + + private DeletionTime currentDeletion() + { + return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime; + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + // The current deletion as of dealing with this marker. + DeletionTime currentDeletion = currentDeletion(); + + for (int i = 0; i < versions.length; i++) + { + RangeTombstoneMarker marker = versions[i]; + + // Update what the source now thinks is the current deletion + if (marker != null) + sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null; + + // If merged == null, some of the source is opening or closing a marker + if (merged == null) + { + // but if it's not this source, move to the next one + if (marker == null) + continue; + + // We have a close and/or open marker for a source, with nothing corresponding in merged. + // Because merged is a superset, this imply that we have a current deletion (being it due to an + // early opening in merged or a partition level deletion) and that this deletion will still be + // active after that point. Further whatever deletion was open or is open by this marker on the + // source, that deletion cannot supersedes the current one. + // + // But while the marker deletion (before and/or after this point) cannot supersed the current + // deletion, we want to know if it's equal to it (both before and after), because in that case + // the source is up to date and we don't want to include repair. + // + // So in practice we have 2 possible case: + // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then + // it won't be from that point on unless it's a boundary and the new opened deletion time + // is also equal to the current deletion (note that this implies the boundary has the same + // closing and opening deletion time, which should generally not happen, but can due to legacy + // reading code not avoiding this for a while, see CASSANDRA-13237). + // 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and + // it may now be (if it isn't we just have nothing to do for that marker). + assert !currentDeletion.isLive() : currentDeletion.toString(); + + if (markerToRepair[i] == null) + { + // Since there is an ongoing merged deletion, the only way we don't have an open repair for + // this source is that it had a range open with the same deletion as current and it's + // closing it. + assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) + : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); + + // and so unless it's a boundary whose opening deletion time is still equal to the current + // deletion (see comment above for why this can actually happen), we have to repair the source + // from that point on. + if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))) + markerToRepair[i] = marker.closeBound(isReversed).invert(); + } + // In case 2) above, we only have something to do if the source is up-to-date after that point + else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))) + { + closeOpenMarker(i, marker.openBound(isReversed).invert()); + } + } + else + { + // We have a change of current deletion in merged (potentially to/from no deletion at all). + + if (merged.isClose(isReversed)) + { + // We're closing the merged range. If we've marked the source as needing to be repaired for + // that range, close and add it to the repair to be sent. + if (markerToRepair[i] != null) + closeOpenMarker(i, merged.closeBound(isReversed)); + + } + + if (merged.isOpen(isReversed)) + { + // If we're opening a new merged range (or just switching deletion), then unless the source + // is up to date on that deletion (note that we've updated what the source deleteion is + // above), we'll have to sent the range to the source. + DeletionTime newDeletion = merged.openDeletionTime(isReversed); + DeletionTime sourceDeletion = sourceDeletionTime[i]; + if (!newDeletion.equals(sourceDeletion)) + markerToRepair[i] = merged.openBound(isReversed); + } + } + } + + if (merged != null) + mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null; + } + + private void closeOpenMarker(int i, Slice.Bound close) + { + Slice.Bound open = markerToRepair[i]; + update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion())); + markerToRepair[i] = null; + } + + public void close() + { + for (int i = 0; i < repairs.length; i++) + { + if (repairs[i] == null) + continue; + + // use a separate verb here because we don't want these to be get the white glove hint- + // on-timeout behavior that a "real" mutation gets + Tracing.trace("Sending read-repair-mutation to {}", sources[i]); + MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR); + repairResults.add(MessagingService.instance().sendRR(msg, sources[i])); + } + } + } + } + + private class ShortReadProtection extends Transformation<UnfilteredRowIterator> + { + private final InetAddress source; + private final DataLimits.Counter counter; + private final DataLimits.Counter postReconciliationCounter; + + private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter) + { + this.source = source; - this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount(); ++ this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); + this.postReconciliationCounter = postReconciliationCounter; + } + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + partition = Transformation.apply(partition, counter); + // must apply and extend with same protection instance + ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey()); + partition = MoreRows.extend(partition, protection); + partition = Transformation.apply(partition, protection); // apply after, so it is retained when we extend (in case we need to reextend) + return partition; + } + + private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator> + { + final CFMetaData metadata; + final DecoratedKey partitionKey; + Clustering lastClustering; + int lastCount = 0; + + private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey) + { + this.metadata = metadata; + this.partitionKey = partitionKey; + } + + @Override + public Row applyToRow(Row row) + { + lastClustering = row.clustering(); + return row; + } + + @Override + public UnfilteredRowIterator moreContents() + { + // We have a short read if the node this is the result of has returned the requested number of + // rows for that partition (i.e. it has stopped returning results due to the limit), but some of + // those results haven't made it in the final result post-reconciliation due to other nodes + // tombstones. If that is the case, then the node might have more results that we should fetch + // as otherwise we might return less results than required, or results that shouldn't be returned + // (because the node has tombstone that hides future results from other nodes but that haven't + // been returned due to the limit). + // Also note that we only get here once all the results for this node have been returned, and so + // if the node had returned the requested number but we still get there, it imply some results were + // skipped during reconciliation. + if (lastCount == counter.counted() || !counter.isDoneForPartition()) + return null; + lastCount = counter.counted(); + + assert !postReconciliationCounter.isDoneForPartition(); + + // We need to try to query enough additional results to fulfill our query, but because we could still + // get short reads on that additional query, just querying the number of results we miss may not be + // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only + // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result. + // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that + // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n. + // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a + // counting iterator. + int n = postReconciliationCounter.countedInCurrentPartition(); + int x = counter.countedInCurrentPartition(); + int toQuery = Math.max(((n * n) / x) - n, 1); + + DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); + ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + retryLimits, + partitionKey, + retryFilter); + + return doShortReadRetry(cmd); + } + + private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand) + { + DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); + if (StorageProxy.canDoLocalRequest(source)) + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + else + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.responses.size() == 1; + return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand); + } + } + } + + public boolean isDataPresent() + { + return !responses.isEmpty(); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index ea082d5,7b7979d..6610cf7 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -1598,14 -1358,11 +1598,14 @@@ public class StorageProxy implements St throws UnavailableException, ReadFailureException, ReadTimeoutException { long start = System.nanoTime(); - List<Row> rows = null; - try { - rows = fetchRows(commands, consistencyLevel); + PartitionIterator result = fetchRows(group.commands, consistencyLevel); + // If we have more than one command, then despite each read command honoring the limit, the total result + // might not honor it and so we should enforce it + if (group.commands.size() > 1) - result = group.limits().filter(result, group.nowInSec()); ++ result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition()); + return result; } catch (UnavailableException e) { @@@ -1953,218 -1716,267 +1953,220 @@@ } } - public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadFailureException, ReadTimeoutException + private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator { - Tracing.trace("Computing ranges to query"); - long startTime = System.nanoTime(); + private final ReadCallback handler; + private PartitionIterator result; - Keyspace keyspace = Keyspace.open(command.keyspace); - List<Row> rows; - // now scan until we have enough results - try + private SingleRangeResponse(ReadCallback handler) { - int liveRowCount = 0; - boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions(); - rows = new ArrayList<>(); + this.handler = handler; + } - // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be - // expensive in clusters with vnodes) - List<? extends AbstractBounds<RowPosition>> ranges; - if (keyspace.getReplicationStrategy() instanceof LocalStrategy) - ranges = command.keyRange.unwrap(); - else - ranges = getRestrictedRanges(command.keyRange); - - // determine the number of rows to be fetched and the concurrency factor - int rowsToBeFetched = command.limit(); - int concurrencyFactor; - if (command.requiresScanningAllRanges()) - { - // all nodes must be queried - rowsToBeFetched *= ranges.size(); - concurrencyFactor = ranges.size(); - logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}", - ranges.size(), concurrencyFactor); + private void waitForResponse() throws ReadTimeoutException + { + if (result != null) + return; + + try + { + result = handler.get(); } - else + catch (DigestMismatchException e) { - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - concurrencyFactor = resultRowsPerRange == 0.0 - ? 1 - : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - - logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", - ranges.size(), - concurrencyFactor, - resultRowsPerRange); - } - - boolean haveSufficientRows = false; - int i = 0; - AbstractBounds<RowPosition> nextRange = null; - List<InetAddress> nextEndpoints = null; - List<InetAddress> nextFilteredEndpoints = null; - while (i < ranges.size()) - { - List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor); - int concurrentFetchStartingIndex = i; - int concurrentRequests = 0; - while ((i - concurrentFetchStartingIndex) < concurrencyFactor) - { - AbstractBounds<RowPosition> range = nextRange == null - ? ranges.get(i) - : nextRange; - List<InetAddress> liveEndpoints = nextEndpoints == null - ? getLiveSortedEndpoints(keyspace, range.right) - : nextEndpoints; - List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null - ? consistency_level.filterForQuery(keyspace, liveEndpoints) - : nextFilteredEndpoints; - ++i; - ++concurrentRequests; - - // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take - // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges - // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. - while (i < ranges.size()) - { - nextRange = ranges.get(i); - nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right); - nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints); - - // If the current range right is the min token, we should stop merging because CFS.getRangeSlice - // don't know how to deal with a wrapping range. - // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps - // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking - // wire compatibility, so It's likely easier not to bother; - if (range.right.isMinimum()) - break; - - List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency_level.isSufficientLiveNodes(keyspace, merged)) - break; - - List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) - break; - - // If we get there, merge this range and the next one - range = range.withNewRight(nextRange.right); - liveEndpoints = merged; - filteredEndpoints = filteredMerged; - ++i; - } - - AbstractRangeCommand nodeCmd = command.forSubRange(range); + throw new AssertionError(e); // no digests in range slices yet + } + } - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp); - List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace))); - ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(filteredEndpoints); - if (filteredEndpoints.size() == 1 - && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())) - { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler)); - } - else - { - MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage(); - for (InetAddress endpoint : filteredEndpoints) - { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); - } - } - scanHandlers.add(Pair.create(nodeCmd, handler)); - } - Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex); + protected RowIterator computeNext() + { + waitForResponse(); + return result.hasNext() ? result.next() : endOfData(); + } - List<AsyncOneResponse> repairResponses = new ArrayList<>(); - for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers) - { - ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right; - RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; + public void close() + { + if (result != null) + result.close(); + } + } - try - { - for (Row row : handler.get()) - { - rows.add(row); - if (countLiveRows) - liveRowCount += row.getLiveCount(command.predicate, command.timestamp); - } - repairResponses.addAll(resolver.repairResults); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - // we timed out or failed waiting for responses - int blockFor = consistency_level.blockFor(keyspace); - int responseCount = resolver.responses.size(); - String gotData = responseCount > 0 - ? resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size()); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size()); - } - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } + private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final Iterator<RangeForQuery> ranges; + private final int totalRangeCount; + private final PartitionRangeReadCommand command; + private final Keyspace keyspace; + private final ConsistencyLevel consistency; - // if we're done, great, otherwise, move to the next range - int count = countLiveRows ? liveRowCount : rows.size(); - if (count >= rowsToBeFetched) - { - haveSufficientRows = true; - break; - } - } + private final long startTime; + private DataLimits.Counter counter; + private PartitionIterator sentQueryIterator; - try - { - FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - // We got all responses, but timed out while repairing - int blockFor = consistency_level.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); - } + private int concurrencyFactor; + // The two following "metric" are maintained to improve the concurrencyFactor + // when it was not good enough initially. + private int liveReturned; + private int rangesQueried; - if (haveSufficientRows) - return command.postReconciliationProcessing(rows); + public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + { + this.command = command; + this.concurrencyFactor = concurrencyFactor; + this.startTime = System.nanoTime(); + this.ranges = new RangeMerger(ranges, keyspace, consistency); + this.totalRangeCount = ranges.rangeCount(); + this.consistency = consistency; + this.keyspace = keyspace; + } - // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor - // based on the results we've seen so far (as long as we still have ranges left to query) - if (i < ranges.size()) + public RowIterator computeNext() + { + try + { + while (sentQueryIterator == null || !sentQueryIterator.hasNext()) { - float fetchedRows = countLiveRows ? liveRowCount : rows.size(); - float remainingRows = rowsToBeFetched - fetchedRows; - float actualRowsPerRange; - if (fetchedRows == 0.0) - { - // we haven't actually gotten any results, so query all remaining ranges at once - actualRowsPerRange = 0.0f; - concurrencyFactor = ranges.size() - i; - } - else + // If we don't have more range to handle, we're done + if (!ranges.hasNext()) + return endOfData(); + + // else, sends the next batch of concurrent queries (after having close the previous iterator) + if (sentQueryIterator != null) { - actualRowsPerRange = fetchedRows / i; - concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); + liveReturned += counter.counted(); + sentQueryIterator.close(); + + // It's not the first batch of queries and we're not done, so we we can use what has been + // returned so far to improve our rows-per-range estimate and update the concurrency accordingly + updateConcurrencyFactor(); } - logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - actualRowsPerRange, (int) remainingRows, concurrencyFactor); + sentQueryIterator = sendNextRequests(); } + + return sentQueryIterator.next(); + } + catch (UnavailableException e) + { + rangeMetrics.unavailables.mark(); + throw e; + } + catch (ReadTimeoutException e) + { + rangeMetrics.timeouts.mark(); + throw e; + } + catch (ReadFailureException e) + { + rangeMetrics.failures.mark(); + throw e; } } - catch (ReadTimeoutException e) + + private void updateConcurrencyFactor() { - rangeMetrics.timeouts.mark(); - throw e; + if (liveReturned == 0) + { + // we haven't actually gotten any results, so query all remaining ranges at once + concurrencyFactor = totalRangeCount - rangesQueried; + return; + } + + // Otherwise, compute how many rows per range we got on average and pick a concurrency factor + // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries. + int remainingRows = command.limits().count() - liveReturned; + float rowsPerRange = (float)liveReturned / (float)rangesQueried; + concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange))); + logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", + rowsPerRange, (int) remainingRows, concurrencyFactor); } - catch (UnavailableException e) + + private SingleRangeResponse query(RangeForQuery toQuery) { - rangeMetrics.unavailables.mark(); - throw e; + PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range); + + DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size()); + + int blockFor = consistency.blockFor(keyspace); + int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); + List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); + ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints); + + handler.assureSufficientLiveNodes(); + + if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0))) + { + StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); + } + else + { + for (InetAddress endpoint : toQuery.filteredEndpoints) + { + MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint)); + Tracing.trace("Enqueuing request to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); + } + } + + return new SingleRangeResponse(handler); } - catch (ReadFailureException e) + + private PartitionIterator sendNextRequests() { - rangeMetrics.failures.mark(); - throw e; + List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); + for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) + { + concurrentQueries.add(query(ranges.next())); + ++rangesQueried; + } + + Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); + // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to + // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. - counter = DataLimits.NONE.newCounter(command.nowInSec(), true); ++ counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition()); + return counter.applyTo(PartitionIterators.concat(concurrentQueries)); } - finally + + public void close() { - long latency = System.nanoTime() - startTime; - rangeMetrics.addNano(latency); - Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + try + { + if (sentQueryIterator != null) + sentQueryIterator.close(); + } + finally + { + long latency = System.nanoTime() - startTime; + rangeMetrics.addNano(latency); + Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + } } - return command.postReconciliationProcessing(rows); + } + + @SuppressWarnings("resource") + public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel) + { + Tracing.trace("Computing ranges to query"); + + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel); + + // our estimate of how many result rows there will be per-range + float resultsPerRange = estimateResultsPerRange(command, keyspace); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + int concurrencyFactor = resultsPerRange == 0.0 + ? 1 + : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange))); + logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange); + + // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. + - return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec()); ++ return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), ++ command.nowInSec(), ++ command.selectsFullPartition()); } public Map<String, List<String>> getSchemaVersions() http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index 74ec47d,02623eb..ffd1b82 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@@ -29,196 -34,367 +29,196 @@@ import org.apache.cassandra.service.Cli abstract class AbstractQueryPager implements QueryPager { - private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class); - - private final ConsistencyLevel consistencyLevel; - private final boolean localQuery; - - protected final CFMetaData cfm; - protected final IDiskAtomFilter columnFilter; - private final long timestamp; + protected final ReadCommand command; + protected final DataLimits limits; + protected final int protocolVersion; private int remaining; - private boolean exhausted; - private boolean shouldFetchExtraRow; - - protected AbstractQueryPager(ConsistencyLevel consistencyLevel, - int toFetch, - boolean localQuery, - String keyspace, - String columnFamily, - IDiskAtomFilter columnFilter, - long timestamp) - { - this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp); - } - protected AbstractQueryPager(ConsistencyLevel consistencyLevel, - int toFetch, - boolean localQuery, - CFMetaData cfm, - IDiskAtomFilter columnFilter, - long timestamp) - { - this.consistencyLevel = consistencyLevel; - this.localQuery = localQuery; - - this.cfm = cfm; - this.columnFilter = columnFilter; - this.timestamp = timestamp; - - this.remaining = toFetch; - } - - - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException - { - if (isExhausted()) - return Collections.emptyList(); - - int currentPageSize = nextPageSize(pageSize); - List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery)); - - if (rows.isEmpty()) - { - logger.debug("Got empty set of rows, considering pager exhausted"); - exhausted = true; - return Collections.emptyList(); - } - - int liveCount = getPageLiveCount(rows); - logger.debug("Fetched {} live rows", liveCount); - - // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked - // (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient - // but most of the time there should be nothing or very little to trim. - if (liveCount > currentPageSize) - { - rows = discardLast(rows, liveCount - currentPageSize); - liveCount = currentPageSize; - } - - remaining -= liveCount; - - // If we've got less than requested, there is no more query to do (but - // we still need to return the current page) - if (liveCount < currentPageSize) - { - logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize); - exhausted = true; - } + // This is the last key we've been reading from (or can still be reading within). This the key for + // which remainingInPartition makes sense: if we're starting another key, we should reset remainingInPartition + // (and this is done in PagerIterator). This can be null (when we start). + private DecoratedKey lastKey; + private int remainingInPartition; - // If it's not the first query and the first column is the last one returned (likely - // but not certain since paging can race with deletes/expiration), then remove the - // first column. - if (containsPreviousLast(rows.get(0))) - { - rows = discardFirst(rows); - remaining++; - } - // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size, - // so if the page is full, trim the last entry - else if (shouldFetchExtraRow && !exhausted) - { - // We've asked for one more than necessary - rows = discardLast(rows); - remaining++; - } - - logger.debug("Remaining rows to page: {}", remaining); - - if (!isExhausted()) - shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1)); - - return rows; - } + private boolean exhausted; - private List<Row> filterEmpty(List<Row> result) + protected AbstractQueryPager(ReadCommand command, int protocolVersion) { - for (Row row : result) - { - if (row.cf == null || !row.cf.hasColumns()) - { - List<Row> newResult = new ArrayList<Row>(result.size() - 1); - for (Row row2 : result) - { - if (row2.cf == null || !row2.cf.hasColumns()) - continue; + this.command = command; + this.protocolVersion = protocolVersion; + this.limits = command.limits(); - newResult.add(row2); - } - return newResult; - } - } - return result; + this.remaining = limits.count(); + this.remainingInPartition = limits.perPartitionCount(); } - protected void restoreState(int remaining, boolean shouldFetchExtraRow) + public ReadOrderGroup startOrderGroup() { - this.remaining = remaining; - this.shouldFetchExtraRow = shouldFetchExtraRow; + return command.startOrderGroup(); } - public boolean isExhausted() + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException { - return exhausted || remaining == 0; - } + if (isExhausted()) + return EmptyIterators.partition(); - public int maxRemaining() - { - return remaining; + pageSize = Math.min(pageSize, remaining); + Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); + return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager); } - public long timestamp() + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { - return timestamp; - } + if (isExhausted()) + return EmptyIterators.partition(); - private int nextPageSize(int pageSize) - { - return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0); + pageSize = Math.min(pageSize, remaining); + RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); + return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), pager); } - public ColumnCounter columnCounter() + public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadOrderGroup orderGroup) { - return columnFilter.columnCounter(cfm.comparator, timestamp); - } - - protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException; - - /** - * Checks to see if the first row of a new page contains the last row from the previous page. - * @param first the first row of the new page - * @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise - */ - protected abstract boolean containsPreviousLast(Row first); - - /** - * Saves the paging state by recording the last seen partition key and cell name (where applicable). - * @param last the last row in the current page - * @return true if an extra row should be fetched in the next page,false otherwise - */ - protected abstract boolean recordLast(Row last); + if (isExhausted()) + return EmptyIterators.unfilteredPartition(cfm, false); - protected abstract boolean isReversed(); + pageSize = Math.min(pageSize, remaining); + UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec()); - private List<Row> discardFirst(List<Row> rows) - { - return discardFirst(rows, 1); + return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(orderGroup), pager); } - @VisibleForTesting - List<Row> discardFirst(List<Row> rows, int toDiscard) + private class UnfilteredPager extends Pager<Unfiltered> { - if (toDiscard == 0 || rows.isEmpty()) - return rows; - int i = 0; - DecoratedKey firstKey = null; - ColumnFamily firstCf = null; - while (toDiscard > 0 && i < rows.size()) + private UnfilteredPager(DataLimits pageLimits, int nowInSec) { - Row first = rows.get(i++); - firstKey = first.key; - firstCf = first.cf.cloneMeShallow(isReversed()); - toDiscard -= isReversed() - ? discardLast(first.cf, toDiscard, firstCf) - : discardFirst(first.cf, toDiscard, firstCf); + super(pageLimits, nowInSec); } - // If there is less live data than to discard, all is discarded - if (toDiscard > 0) - return Collections.<Row>emptyList(); - - // i is the index of the first row that we are sure to keep. On top of that, - // we also keep firstCf is it hasn't been fully emptied by the last iteration above. - int count = firstCf.getColumnCount(); - int newSize = rows.size() - (count == 0 ? i : i - 1); - List<Row> newRows = new ArrayList<Row>(newSize); - if (count != 0) - newRows.add(new Row(firstKey, firstCf)); - newRows.addAll(rows.subList(i, rows.size())); - - return newRows; - } - - private List<Row> discardLast(List<Row> rows) - { - return discardLast(rows, 1); + protected BaseRowIterator<Unfiltered> apply(BaseRowIterator<Unfiltered> partition) + { + return Transformation.apply(counter.applyTo((UnfilteredRowIterator) partition), this); + } } - @VisibleForTesting - List<Row> discardLast(List<Row> rows, int toDiscard) + private class RowPager extends Pager<Row> { - if (toDiscard == 0 || rows.isEmpty()) - return rows; - int i = rows.size()-1; - DecoratedKey lastKey = null; - ColumnFamily lastCf = null; - while (toDiscard > 0 && i >= 0) + private RowPager(DataLimits pageLimits, int nowInSec) { - Row last = rows.get(i--); - lastKey = last.key; - lastCf = last.cf.cloneMeShallow(isReversed()); - toDiscard -= isReversed() - ? discardFirst(last.cf, toDiscard, lastCf) - : discardLast(last.cf, toDiscard, lastCf); + super(pageLimits, nowInSec); } - // If there is less live data than to discard, all is discarded - if (toDiscard > 0) - return Collections.<Row>emptyList(); - - // i is the index of the last row that we are sure to keep. On top of that, - // we also keep lastCf is it hasn't been fully emptied by the last iteration above. - int count = lastCf.getColumnCount(); - int newSize = count == 0 ? i+1 : i+2; - List<Row> newRows = new ArrayList<Row>(newSize); - newRows.addAll(rows.subList(0, i+1)); - if (count != 0) - newRows.add(new Row(lastKey, lastCf)); - - return newRows; + protected BaseRowIterator<Row> apply(BaseRowIterator<Row> partition) + { + return Transformation.apply(counter.applyTo((RowIterator) partition), this); + } } - private int getPageLiveCount(List<Row> page) + private abstract class Pager<T extends Unfiltered> extends Transformation<BaseRowIterator<T>> { - int count = 0; - for (Row row : page) - count += columnCounter().countAll(row.cf).live(); - return count; - } + private final DataLimits pageLimits; + protected final DataLimits.Counter counter; + private Row lastRow; + private boolean isFirstPartition = true; - private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf) - { - boolean isReversed = isReversed(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed); - return isReversed - ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester) - : discardHead(toDiscard, newCf, cf.iterator(), tester); - } + private Pager(DataLimits pageLimits, int nowInSec) + { - this.counter = pageLimits.newCounter(nowInSec, true); ++ this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition()); + this.pageLimits = pageLimits; + } - private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf) - { - boolean isReversed = isReversed(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed); - return isReversed - ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester) - : discardTail(cf, toDiscard, newCf, cf.iterator(), tester); - } + @Override + public BaseRowIterator<T> applyToPartition(BaseRowIterator<T> partition) + { + DecoratedKey key = partition.partitionKey(); + if (lastKey == null || !lastKey.equals(key)) + remainingInPartition = limits.perPartitionCount(); + lastKey = key; + + // If this is the first partition of this page, this could be the continuation of a partition we've started + // on the previous page. In which case, we could have the problem that the partition has no more "regular" + // rows (but the page size is such we didn't knew before) but it does has a static row. We should then skip + // the partition as returning it would means to the upper layer that the partition has "only" static columns, + // which is not the case (and we know the static results have been sent on the previous page). + if (isFirstPartition) + { + isFirstPartition = false; + if (isPreviouslyReturnedPartition(key) && !partition.hasNext()) + { + partition.close(); + return null; + } + } - private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester) - { - ColumnCounter counter = columnCounter(); + return apply(partition); + } - List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size()); + protected abstract BaseRowIterator<T> apply(BaseRowIterator<T> partition); - // Discard the first 'toDiscard' live, non-static cells - while (iter.hasNext()) + @Override + public void onClose() { - Cell c = iter.next(); - - // if it's a static column, don't count it and save it to add to the trimmed results - ColumnDefinition columnDef = cfm.getColumnDefinition(c.name()); - if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC) + recordLast(lastKey, lastRow); + + int counted = counter.counted(); + remaining -= counted; + // If the clustering of the last row returned is a static one, it means that the partition was only + // containing data within the static columns. If the clustering of the last row returned is empty + // it means that there is only one row per partition. Therefore, in both cases there are no data remaining + // within the partition. + if (lastRow != null && (lastRow.clustering() == Clustering.STATIC_CLUSTERING + || lastRow.clustering() == Clustering.EMPTY)) { - staticCells.add(c); - continue; + remainingInPartition = 0; } - - counter.count(c, tester); - - // once we've discarded the required amount, add the rest - if (counter.live() > toDiscard) + else { - for (Cell staticCell : staticCells) - copy.addColumn(staticCell); - - copy.addColumn(c); - while (iter.hasNext()) - copy.addColumn(iter.next()); + remainingInPartition -= counter.countedInCurrentPartition(); } + exhausted = counted < pageLimits.count(); } - int live = counter.live(); - // We want to take into account the row even if it was containing only static columns - if (live == 0 && !staticCells.isEmpty()) - live = 1; - return Math.min(live, toDiscard); - } - - private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester) - { - // Redoing the counting like that is not extremely efficient. - // This is called only for reversed slices or in the case of a race between - // paging and a deletion (pretty unlikely), so this is probably acceptable. - int liveCount = columnCounter().countAll(cf).live(); - if (liveCount == toDiscard) - return toDiscard; - - ColumnCounter counter = columnCounter(); - // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard') - while (iter.hasNext()) + public Row applyToStatic(Row row) { - Cell c = iter.next(); - counter.count(c, tester); - if (counter.live() > liveCount - toDiscard) - break; + if (!row.isEmpty()) + lastRow = row; + return row; + } - copy.addColumn(c); + @Override + public Row applyToRow(Row row) + { + lastRow = row; + return row; } - return Math.min(liveCount, toDiscard); } - /** - * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column - * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal - * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we - * need to start from the last non-static cell. - */ - protected Cell firstNonStaticCell(ColumnFamily cf) + protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition) { - for (Cell cell : cf) - { - ColumnDefinition def = cfm.getColumnDefinition(cell.name()); - if (def == null || def.kind != ColumnDefinition.Kind.STATIC) - return cell; - } - return null; + this.lastKey = lastKey; + this.remaining = remaining; + this.remainingInPartition = remainingInPartition; } - protected static Cell lastCell(ColumnFamily cf) + public boolean isExhausted() { - return cf.getReverseSortedColumns().iterator().next(); + return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) && remainingInPartition == 0); } + + public int maxRemaining() + { + return remaining; + } + + protected int remainingInPartition() + { + return remainingInPartition; + } + + protected abstract ReadCommand nextPageReadCommand(int pageSize); + protected abstract void recordLast(DecoratedKey key, Row row); + protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 8caa14d,35d0971..11bbc0e --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@@ -38,24 -36,19 +38,26 @@@ import org.apache.cassandra.service.Cli * * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the - * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't + * cfs meanPartitionSize to decide if parallelizing some of the command might be worth it while being confident we don't * blow out memory. */ -class MultiPartitionPager implements QueryPager +public class MultiPartitionPager implements QueryPager { private final SinglePartitionPager[] pagers; - private final long timestamp; + private final DataLimits limit; ++ private final boolean selectsFullPartitions; + + private final int nowInSec; private int remaining; private int current; - MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state, int limitForQuery) + public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, int protocolVersion) { + this.limit = group.limits(); + this.nowInSec = group.nowInSec(); ++ this.selectsFullPartitions = group.selectsFullPartition(); + int i = 0; // If it's not the beginning (state != null), we need to find where we were and skip previous commands // since they are done. @@@ -70,15 -63,29 +72,16 @@@ return; } - pagers = new SinglePartitionPager[commands.size() - i]; + pagers = new SinglePartitionPager[group.commands.size() - i]; // 'i' is on the first non exhausted pager for the previous page (or the first one) - pagers[0] = group.commands.get(i).getPager(state, protocolVersion); - pagers[0] = makePager(commands.get(i), consistencyLevel, cState, localQuery, state); - timestamp = commands.get(i).timestamp; ++ SinglePartitionReadCommand command = group.commands.get(i); ++ pagers[0] = command.getPager(state, protocolVersion); // Following ones haven't been started yet - for (int j = i + 1; j < commands.size(); j++) - { - ReadCommand command = commands.get(j); - if (command.timestamp != timestamp) - throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen."); - pagers[j - i] = makePager(command, consistencyLevel, cState, localQuery, null); - } + for (int j = i + 1; j < group.commands.size(); j++) + pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion); - remaining = state == null ? limitForQuery : state.remaining; - } - - private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state) - { - return command instanceof SliceFromReadCommand - ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, localQuery, state) - : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, localQuery); + remaining = state == null ? limit.count() : state.remaining; } public PagingState state() @@@ -106,93 -113,35 +109,93 @@@ return true; } - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException + public ReadOrderGroup startOrderGroup() { - List<Row> result = new ArrayList<Row>(); - - int remainingThisQuery = Math.min(remaining, pageSize); - while (remainingThisQuery > 0 && !isExhausted()) + // Note that for all pagers, the only difference is the partition key to which it applies, so in practice we + // can use any of the sub-pager ReadOrderGroup group to protect the whole pager + for (int i = current; i < pagers.length; i++) { - // isExhausted has set us on the first non-exhausted pager - List<Row> page = pagers[current].fetchPage(remainingThisQuery); - if (page.isEmpty()) - continue; - - Row row = page.get(0); - int fetched = pagers[current].columnCounter().countAll(row.cf).live(); - remaining -= fetched; - remainingThisQuery -= fetched; - result.add(row); + if (pagers[i] != null) + return pagers[i].startOrderGroup(); } + throw new AssertionError("Shouldn't be called on an exhausted pager"); + } - return result; + @SuppressWarnings("resource") // iter closed via countingIter + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException + { + int toQuery = Math.min(remaining, pageSize); + PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null); - DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true); ++ DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions); + iter.setCounter(counter); + return counter.applyTo(iter); } - public int maxRemaining() + @SuppressWarnings("resource") // iter closed via countingIter + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { - return remaining; + int toQuery = Math.min(remaining, pageSize); + PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup); - DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true); ++ DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions); + iter.setCounter(counter); + return counter.applyTo(iter); } - public long timestamp() + private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator { - return timestamp; + private final int pageSize; + private PartitionIterator result; + private DataLimits.Counter counter; + + // For "normal" queries + private final ConsistencyLevel consistency; + private final ClientState clientState; + + // For internal queries + private final ReadOrderGroup orderGroup; + + public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadOrderGroup orderGroup) + { + this.pageSize = pageSize; + this.consistency = consistency; + this.clientState = clientState; + this.orderGroup = orderGroup; + } + + public void setCounter(DataLimits.Counter counter) + { + this.counter = counter; + } + + protected RowIterator computeNext() + { + while (result == null || !result.hasNext()) + { + if (result != null) + result.close(); + + // This sets us on the first non-exhausted pager + if (isExhausted()) + return endOfData(); + + int toQuery = pageSize - counter.counted(); + result = consistency == null + ? pagers[current].fetchPageInternal(toQuery, orderGroup) + : pagers[current].fetchPage(toQuery, consistency, clientState); + } + return result.next(); + } + + public void close() + { + remaining -= counter.counted(); + if (result != null) + result.close(); + } + } + + public int maxRemaining() + { + return remaining; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/QueryPagers.java index 02b5de2,f933ccb..c26bf3f --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@@ -36,30 -41,156 +36,30 @@@ public class QueryPager /** * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath. */ - public static int countPaged(String keyspace, - String columnFamily, - ByteBuffer key, - SliceQueryFilter filter, + public static int countPaged(CFMetaData metadata, + DecoratedKey key, + ColumnFilter columnFilter, + ClusteringIndexFilter filter, + DataLimits limits, ConsistencyLevel consistencyLevel, - ClientState cState, + ClientState state, final int pageSize, - long now) throws RequestValidationException, RequestExecutionException + int nowInSec, + boolean isForThrift) throws RequestValidationException, RequestExecutionException { - SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter); - final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, cState, false); + SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter); + final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION); - ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now); + int count = 0; while (!pager.isExhausted()) { - List<Row> next = pager.fetchPage(pageSize); - if (!next.isEmpty()) - counter.countAll(next.get(0).cf); + try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state)) + { - DataLimits.Counter counter = limits.newCounter(nowInSec, true); ++ DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition()); + PartitionIterators.consume(counter.applyTo(iter)); + count += counter.counted(); + } } - return counter.live(); + return count; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index 11d9e19,416a4b2..ba23c67 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -716,17 -549,14 +716,17 @@@ public abstract class CQLTeste protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable { - requireNetwork(); - - return session[protocolVersion-1].execute(formatQuery(query), values); + return sessionNet(protocolVersion).execute(formatQuery(query), values); } - protected Session sessionNet() + protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable { - return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1)); - return sessionNet(maxProtocolVersion).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); ++ return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); + } + - protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable ++ protected Session sessionNet() + { - return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); ++ return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1)); } protected Session sessionNet(int protocolVersion) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org