ifesdjeen commented on code in PR #4738:
URL: https://github.com/apache/cassandra/pull/4738#discussion_r3209791660
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordCommandStoreTryExecuteListeningTest.java:
##########
@@ -124,26 +133,70 @@ public void testTryExecuteListening() throws Throwable
}
}
- private static Command executed(Node node, SaveStatus saveStatus, TxnId
... dependencies)
+ private static Command executed(Node node, SaveStatus saveStatus, Object
... inputs)
{
- PartitionKey key = pk(1, "ks", "tbl");
+ int depCount;
+ Map<PartitionKey, List<TxnId>> depsByInputKey = new TreeMap<>();
+ TxnId[] txnIds;
+ {
+ PartitionKey k = null;
+ for (Object input : inputs)
+ {
+ if (input instanceof Integer)
+ {
+ k = keyN((Integer) input, node);
+ depsByInputKey.put(k, new ArrayList<>());
+ }
+ else depsByInputKey.get(k).add((TxnId)input);
+ }
+ txnIds =
depsByInputKey.values().stream().flatMap(Collection::stream).distinct().sorted().toArray(TxnId[]::new);
+ depCount =
depsByInputKey.values().stream().mapToInt(Collection::size).sum();
+ }
+ PartitionKey[] keys =
depsByInputKey.keySet().toArray(PartitionKey[]::new);
+ Range[] ranges =
Stream.of(keys).map(PartitionKey::asRange).toArray(Range[]::new);
+
+ PartitionKey key = keys[0];
+ AccordCommandStore commandStore = (AccordCommandStore)
node.commandStores().unsafeForKey(key.toUnseekable());
+
Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint,
Routable.Domain.Range);
TxnId txnId = node.nextTxnId(txn);
FullRoute<?> route;
- try { route = node.computeRoute(txnId, Ranges.of(key.asRange())); }
+ try { route = node.computeRoute(txnId, Ranges.of(ranges)); }
catch (TopologyException e) { throw new RuntimeException(e); }
- AccordCommandStore commandStore = (AccordCommandStore)
node.commandStores().unsafeForKey(key.toUnseekable());
- int[] rangesToTxnIds = new int[dependencies.length + 1];
- rangesToTxnIds[0] = rangesToTxnIds.length;
- for (int i = 1; i < rangesToTxnIds.length ; ++i)
- rangesToTxnIds[i] = i - 1;
- Deps deps = new Deps(KeyDeps.NONE,
RangeDeps.SerializerSupport.create(new accord.primitives.Range[] {
key.asRange() }, dependencies, rangesToTxnIds, null));
+ int[] rangesToTxnIds = new int[depCount + ranges.length];
+ {
+ int offset = ranges.length;
+ for (int i = 0 ; i < ranges.length ; ++i)
+ {
+ for (TxnId dep : depsByInputKey.get(keys[i]))
+ rangesToTxnIds[offset++] = Arrays.binarySearch(txnIds,
dep);
+ rangesToTxnIds[i] = offset;
+ }
+ }
+ Deps deps = new Deps(KeyDeps.NONE,
RangeDeps.SerializerSupport.create(ranges, txnIds, rangesToTxnIds, null));
Command.WaitingOn waitingOn; {
- LargeBitSet waitingOnBits = new LargeBitSet(dependencies.length);
- waitingOnBits.setRange(0, dependencies.length);
- waitingOn = new Command.WaitingOn(RoutingKeys.EMPTY,
deps.rangeDeps, new ImmutableBitSet(waitingOnBits), new
ImmutableBitSet(dependencies.length));
+ LargeBitSet waitingOnBits = new LargeBitSet(txnIds.length);
+ waitingOnBits.setRange(0, txnIds.length);
+ waitingOn = new Command.WaitingOn(RoutingKeys.EMPTY,
deps.rangeDeps, new ImmutableBitSet(waitingOnBits), new
ImmutableBitSet(txnIds.length));
}
return Command.Executed.executed(txnId, saveStatus,
Status.Durability.NotDurable,
StoreParticipants.execute(commandStore.unsafeGetRangesForEpoch(), route, txnId,
txnId.epoch()), Ballot.ZERO, txnId, txn.intersecting(route, true),
deps.intersecting(route), Ballot.ZERO, waitingOn, null,
ResultSerializers.APPLIED);
}
+ private static PartitionKey keyN(int n, Node node)
+ {
+ PartitionKey first = pk(1, "ks", "tbl");
+ if (n == 1)
+ return first;
+
+ AccordCommandStore commandStore = (AccordCommandStore)
node.commandStores().unsafeForKey(first.toUnseekable());
+
+ int i = 2;
+ while (true)
+ {
+ PartitionKey next = pk(i, "ks", "tbl");
Review Comment:
did you want to iterate through different keys there?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]