echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1142115352


##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -75,54 +77,68 @@ public SplitsGenerator(
      * @return list containing {@code numSplits} CassandraSplits.
      */
     public List<CassandraSplit> generateSplits() {
+        long numSplits = decideOnNumSplits();
+        List<CassandraSplit> splits = new ArrayList<>();
+        BigInteger increment =
+                (partitioner.ringSize).divide(new 
BigInteger(String.valueOf(numSplits)));
+
+        BigInteger startToken = partitioner.minToken;
+        for (int splitCount = 1; splitCount <= numSplits; splitCount++) {
+            BigInteger endToken = startToken.add(increment);
+            if (splitCount == numSplits) {
+                endToken = partitioner.maxToken;
+            }
+            splits.add(new CassandraSplit(startToken, endToken));
+            startToken = endToken;
+        }
+        LOG.debug("Generated {} splits : {}", splits.size(), splits);
+        return splits;
+    }
+
+    /**
+     * Determine {@code numSplits} based on the estimation of the target table 
size and user defined
+     * {@code maxSplitMemorySize}. Add fallbacks when table size is 
unavailable, too few or too many
+     * splits are calculated.
+     */
+    private long decideOnNumSplits() {
         long numSplits;
         if (maxSplitMemorySize != null) {
+            checkState(

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to