JeetKunDoug commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763592085
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ########## @@ -161,40 +160,22 @@ public void checkBulkWriterIsEnabledOrThrow() } /** - * @return the largest time skew retrieved from the target replicas + * @return the largest time skew retrieved from the target clusters */ @Override - public TimeSkewResponse getTimeSkew(List<RingInstance> instances) + public TimeSkewResponse timeSkew(Range<BigInteger> range) { if (clusterInfos.size() == 1) { - return clusterInfos.get(0).getTimeSkew(instances); + return clusterInfos.get(0).timeSkew(range); } - Map<String, List<RingInstance>> instancesByClusterId = instances.stream().collect(Collectors.groupingBy(instance -> { - String clusterId = instance.clusterId(); - Preconditions.checkState(clusterId != null, - "RingInstance must define its clusterId for coordinated write"); - return clusterId; - })); - long localNow = System.currentTimeMillis(); - long maxDiff = 0; - TimeSkewResponse largestSkew = null; - for (Map.Entry<String, List<RingInstance>> entry : instancesByClusterId.entrySet()) - { - String clusterId = entry.getKey(); - List<RingInstance> instancesOfCluster = entry.getValue(); - ClusterInfo clusterInfo = cluster(clusterId); - Preconditions.checkState(clusterInfo != null, "ClusterInfo not found with clusterId: " + clusterId); - TimeSkewResponse response = clusterInfo.getTimeSkew(instancesOfCluster); - long d = Math.abs(response.currentTime - localNow); - if (Math.abs(response.currentTime - localNow) > maxDiff) - { - maxDiff = d; - largestSkew = response; - } - } - return largestSkew; + return clusterInfos.stream() + .map(clusterInfo -> clusterInfo.timeSkew(range)) + // Find the timeSkew with the lowest remote currentTime, i.e. largest difference with the local current time. Review Comment: This isn't really right, as it would be possible for the current time on a remote host to be much _greater_ than the current time on the host running the task. We really need to do the whole "take the absolute value of the difference between the times" thing, not just look for the min currentTime. I realize also that for multi-cluster scenarios, the other field in the TimeSkewResponse (`allowableSkewInMinutes`) can, in fact, be different (although in theory I'd doubt that it would be). We should really grab 2 things: 1) The time skew response with the maximum _difference_ between localNow and the remote time, where localNow is calculated each time you get a response back (per my previous suggestion) 2) The _minimum_ `allowableSkewInMinutes` value And then craft the "combined" time skew response from those two things. ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroupTest.java: ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.partitioner.Partitioner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CassandraClusterInfoGroupTest +{ + @Test + void testCreateGroupFailWithEmptyList() + { + assertThatThrownBy(() -> new CassandraClusterInfoGroup(null)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("clusterInfos cannot be null or empty"); + + assertThatThrownBy(() -> new CassandraClusterInfoGroup(Collections.emptyList())) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("clusterInfos cannot be null or empty"); + } + + @Test + void testLookupCluster() + { + CassandraClusterInfoGroup group = mockClusterGroup(2, index -> mockClusterInfo("cluster" + index)); + List<ClusterInfo> clusters = group.clusters(); + assertThat(clusters).hasSize(2); + assertThat(group.cluster("cluster0")).isSameAs(clusters.get(0)); + assertThat(group.cluster("cluster1")).isSameAs(clusters.get(1)); + assertThat(group.cluster("cluster2")).isNull(); + } + + @Test + void testClusterId() + { + CassandraClusterInfoGroup group = mockClusterGroup(2, index -> mockClusterInfo("cluster" + index)); + assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0, cluster1]"); + + group = mockClusterGroup(1, index -> mockClusterInfo("cluster" + index)); + assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0]"); + } + + @Test + void testDelegationOfSingleCluster() + { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster0"); + TokenRangeReplicasResponse response = TokenRangeMappingUtils.mockSimpleTokenRangeReplicasResponse(10, 3); + TokenRangeMapping<RingInstance> expectedTokenRangeMapping = TokenRangeMapping.create(() -> response, + () -> Partitioner.Murmur3Partitioner, + RingInstance::new); + when(clusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(expectedTokenRangeMapping); + when(clusterInfo.timeSkew(any())).thenReturn(mock(TimeSkewResponse.class)); + when(clusterInfo.getLowestCassandraVersion()).thenReturn("lowestCassandraVersion"); + when(clusterInfo.clusterWriteAvailability()).thenReturn(Collections.emptyMap()); + CassandraClusterInfoGroup group = mockClusterGroup(1, index -> clusterInfo); + // Since there is a single clusterInfo in the group. It behaves as a simple delegation to the sole clusterInfo + assertThat(group.clusterWriteAvailability()).isSameAs(clusterInfo.clusterWriteAvailability()); + assertThat(group.timeSkew(null)).isSameAs(clusterInfo.timeSkew(null)); + assertThat(group.getLowestCassandraVersion()).isSameAs(clusterInfo.getLowestCassandraVersion()); + assertThat(group.getTokenRangeMapping(true)).isSameAs(clusterInfo.getTokenRangeMapping(true)); + } + + @Test + void testAggregatePartitioner() + { + CassandraClusterInfoGroup invalidGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + Partitioner partitioner = (index % 2 == 0) ? Partitioner.Murmur3Partitioner : Partitioner.RandomPartitioner; + when(clusterInfo.getPartitioner()).thenReturn(partitioner); + return clusterInfo; + }); + assertThatThrownBy(invalidGroup::getPartitioner) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("Clusters are not running with the same partitioner kind. Found partitioners: " + + "{cluster0=org.apache.cassandra.dht.Murmur3Partitioner, cluster1=org.apache.cassandra.dht.RandomPartitioner}"); + + CassandraClusterInfoGroup goodGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + when(clusterInfo.getPartitioner()).thenReturn(Partitioner.Murmur3Partitioner); + return clusterInfo; + }); + assertThat(goodGroup.getPartitioner()).isEqualTo(Partitioner.Murmur3Partitioner); + } + + @Test + void testAggregateWriteAvailability() + { + Map<RingInstance, WriteAvailability> cluster1Availability = ImmutableMap.of(mock(RingInstance.class), WriteAvailability.AVAILABLE); + Map<RingInstance, WriteAvailability> cluster2Availability = ImmutableMap.of(mock(RingInstance.class), WriteAvailability.AVAILABLE); + CassandraClusterInfoGroup group = mockClusterGroup(2, index -> { + Map<RingInstance, WriteAvailability> availability = (index % 2 == 0) ? cluster1Availability : cluster2Availability; + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + when(clusterInfo.clusterWriteAvailability()).thenReturn(availability); + return clusterInfo; + }); + assertThat(group.clusterWriteAvailability()) + .describedAs("clusterWriteAvailability retrieved from group contains entries from both clusters") + .hasSize(2) + .containsValues(WriteAvailability.AVAILABLE, WriteAvailability.AVAILABLE); Review Comment: I always feel better when doing a test like this if I have 2 _different_ values that I'm combining, so maybe change one of the two clusters' ring instance to `UNAVAILABLE_DOWN` and change this assert? ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroupTest.java: ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.partitioner.Partitioner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CassandraClusterInfoGroupTest +{ + @Test + void testCreateGroupFailWithEmptyList() + { + assertThatThrownBy(() -> new CassandraClusterInfoGroup(null)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("clusterInfos cannot be null or empty"); + + assertThatThrownBy(() -> new CassandraClusterInfoGroup(Collections.emptyList())) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("clusterInfos cannot be null or empty"); + } + + @Test + void testLookupCluster() + { + CassandraClusterInfoGroup group = mockClusterGroup(2, index -> mockClusterInfo("cluster" + index)); + List<ClusterInfo> clusters = group.clusters(); + assertThat(clusters).hasSize(2); + assertThat(group.cluster("cluster0")).isSameAs(clusters.get(0)); + assertThat(group.cluster("cluster1")).isSameAs(clusters.get(1)); + assertThat(group.cluster("cluster2")).isNull(); + } + + @Test + void testClusterId() + { + CassandraClusterInfoGroup group = mockClusterGroup(2, index -> mockClusterInfo("cluster" + index)); + assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0, cluster1]"); + + group = mockClusterGroup(1, index -> mockClusterInfo("cluster" + index)); + assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0]"); + } + + @Test + void testDelegationOfSingleCluster() + { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster0"); + TokenRangeReplicasResponse response = TokenRangeMappingUtils.mockSimpleTokenRangeReplicasResponse(10, 3); + TokenRangeMapping<RingInstance> expectedTokenRangeMapping = TokenRangeMapping.create(() -> response, + () -> Partitioner.Murmur3Partitioner, + RingInstance::new); + when(clusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(expectedTokenRangeMapping); + when(clusterInfo.timeSkew(any())).thenReturn(mock(TimeSkewResponse.class)); + when(clusterInfo.getLowestCassandraVersion()).thenReturn("lowestCassandraVersion"); + when(clusterInfo.clusterWriteAvailability()).thenReturn(Collections.emptyMap()); + CassandraClusterInfoGroup group = mockClusterGroup(1, index -> clusterInfo); + // Since there is a single clusterInfo in the group. It behaves as a simple delegation to the sole clusterInfo + assertThat(group.clusterWriteAvailability()).isSameAs(clusterInfo.clusterWriteAvailability()); + assertThat(group.timeSkew(null)).isSameAs(clusterInfo.timeSkew(null)); + assertThat(group.getLowestCassandraVersion()).isSameAs(clusterInfo.getLowestCassandraVersion()); + assertThat(group.getTokenRangeMapping(true)).isSameAs(clusterInfo.getTokenRangeMapping(true)); + } + + @Test + void testAggregatePartitioner() + { + CassandraClusterInfoGroup invalidGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + Partitioner partitioner = (index % 2 == 0) ? Partitioner.Murmur3Partitioner : Partitioner.RandomPartitioner; + when(clusterInfo.getPartitioner()).thenReturn(partitioner); + return clusterInfo; + }); + assertThatThrownBy(invalidGroup::getPartitioner) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("Clusters are not running with the same partitioner kind. Found partitioners: " + + "{cluster0=org.apache.cassandra.dht.Murmur3Partitioner, cluster1=org.apache.cassandra.dht.RandomPartitioner}"); + + CassandraClusterInfoGroup goodGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + when(clusterInfo.getPartitioner()).thenReturn(Partitioner.Murmur3Partitioner); + return clusterInfo; + }); + assertThat(goodGroup.getPartitioner()).isEqualTo(Partitioner.Murmur3Partitioner); + } + + @Test + void testAggregateWriteAvailability() + { + Map<RingInstance, WriteAvailability> cluster1Availability = ImmutableMap.of(mock(RingInstance.class), WriteAvailability.AVAILABLE); + Map<RingInstance, WriteAvailability> cluster2Availability = ImmutableMap.of(mock(RingInstance.class), WriteAvailability.AVAILABLE); + CassandraClusterInfoGroup group = mockClusterGroup(2, index -> { + Map<RingInstance, WriteAvailability> availability = (index % 2 == 0) ? cluster1Availability : cluster2Availability; + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + when(clusterInfo.clusterWriteAvailability()).thenReturn(availability); + return clusterInfo; + }); + assertThat(group.clusterWriteAvailability()) + .describedAs("clusterWriteAvailability retrieved from group contains entries from both clusters") + .hasSize(2) + .containsValues(WriteAvailability.AVAILABLE, WriteAvailability.AVAILABLE); + } + + @Test + void testAggregateLowestCassandraVersion() + { + CassandraClusterInfoGroup goodGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + when(clusterInfo.getLowestCassandraVersion()).thenReturn("4.0." + index); + return clusterInfo; + }); + assertThat(goodGroup.getLowestCassandraVersion()).isEqualTo("4.0.0"); + } + + @Test + void testAggregateLowestCassandraVersionFailDueToDifference() + { + CassandraClusterInfoGroup badGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + when(clusterInfo.getLowestCassandraVersion()).thenReturn((4 + index) + ".0.0"); + return clusterInfo; + }); + assertThatThrownBy(badGroup::getLowestCassandraVersion) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("Cluster versions are not compatible. lowest=4.0.0 and highest=5.0.0"); + } + + @Test + void testCheckBulkWriterIsEnabledOrThrow() + { + CassandraClusterInfoGroup badGroup = mockClusterGroup(2, index -> { + CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + index); + if (index == 0) Review Comment: Similar to the "make sure I have different values", it would be good to test a non-zero index here (or randomize the index you use) - otherwise, aggregating this information and just calling the `checkBulkWriterIsEnabledOrThrow()` method would both pass the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org