echauchot commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1142115352
########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -75,54 +77,68 @@ public SplitsGenerator( * @return list containing {@code numSplits} CassandraSplits. */ public List<CassandraSplit> generateSplits() { + long numSplits = decideOnNumSplits(); + List<CassandraSplit> splits = new ArrayList<>(); + BigInteger increment = + (partitioner.ringSize).divide(new BigInteger(String.valueOf(numSplits))); + + BigInteger startToken = partitioner.minToken; + for (int splitCount = 1; splitCount <= numSplits; splitCount++) { + BigInteger endToken = startToken.add(increment); + if (splitCount == numSplits) { + endToken = partitioner.maxToken; + } + splits.add(new CassandraSplit(startToken, endToken)); + startToken = endToken; + } + LOG.debug("Generated {} splits : {}", splits.size(), splits); + return splits; + } + + /** + * Determine {@code numSplits} based on the estimation of the target table size and user defined + * {@code maxSplitMemorySize}. Add fallbacks when table size is unavailable, too few or too many + * splits are calculated. + */ + private long decideOnNumSplits() { long numSplits; if (maxSplitMemorySize != null) { + checkState( Review Comment: :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org