yifan-c commented on code in PR #107: URL: https://github.com/apache/cassandra-analytics/pull/107#discussion_r2047944247
########## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java: ########## @@ -69,94 +56,56 @@ public SSTableWriterImplementation(String inDirectory, @NotNull Set<String> userDefinedTypeStatements, int bufferSizeMB) { - this(inDirectory, partitioner, createStatement, insertStatement, userDefinedTypeStatements, bufferSizeMB, 10); + this(inDirectory, determineSupportedPartitioner(partitioner), createStatement, insertStatement, userDefinedTypeStatements, bufferSizeMB); } @VisibleForTesting - SSTableWriterImplementation(String inDirectory, - String partitioner, - String createStatement, - String insertStatement, - @NotNull Set<String> userDefinedTypeStatements, - int bufferSizeMB, - long sstableWatcherDelaySeconds) + public SSTableWriterImplementation(String inDirectory, + IPartitioner partitioner, + String createStatement, + String insertStatement, + @NotNull Set<String> userDefinedTypeStatements, + int bufferSizeMB) { - IPartitioner cassPartitioner = partitioner.toLowerCase().contains("random") ? new RandomPartitioner() - : new Murmur3Partitioner(); - this.writer = configureBuilder(inDirectory, createStatement, insertStatement, bufferSizeMB, userDefinedTypeStatements, - cassPartitioner) + this::onSSTablesProduced, + partitioner) .build(); - this.outputDir = Paths.get(inDirectory); - this.sstableWatcher = new SSTableWatcher(sstableWatcherDelaySeconds); } - private class SSTableWatcher implements Closeable + private static IPartitioner determineSupportedPartitioner(String partitioner) { - // The TOC component is the last one flushed when finishing a SSTable. - // Therefore, it monitors the creation of the TOC component to determine the creation of SSTable - private static final String TOC_COMPONENT_SUFFIX = "-TOC.txt"; - private static final String GLOB_PATTERN_FOR_TOC = "*" + TOC_COMPONENT_SUFFIX; - - private final ScheduledExecutorService sstableWatcherScheduler; - private final Set<SSTableDescriptor> knownSSTables; - - SSTableWatcher(long delaySeconds) - { - ThreadFactory tf = ThreadUtil.threadFactory("SSTableWatcher-" + outputDir.getFileName().toString()); - this.sstableWatcherScheduler = Executors.newSingleThreadScheduledExecutor(tf); - this.knownSSTables = new HashSet<>(); - sstableWatcherScheduler.scheduleWithFixedDelay(this::listSSTables, delaySeconds, delaySeconds, TimeUnit.SECONDS); - } + return partitioner.toLowerCase().contains("random") + ? new RandomPartitioner() + : new Murmur3Partitioner(); + } - private void listSSTables() - { - try (DirectoryStream<Path> stream = Files.newDirectoryStream(outputDir, GLOB_PATTERN_FOR_TOC)) - { - HashSet<SSTableDescriptor> newlyProducedSSTables = new HashSet<>(); - stream.forEach(path -> { - String baseFilename = path.getFileName().toString().replace(TOC_COMPONENT_SUFFIX, ""); - SSTableDescriptor sstable = new SSTableDescriptor(baseFilename); - if (!knownSSTables.contains(sstable)) - { - newlyProducedSSTables.add(sstable); - } - }); - - if (!newlyProducedSSTables.isEmpty()) - { - knownSSTables.addAll(newlyProducedSSTables); - producedSSTablesListener.accept(newlyProducedSSTables); - } - } - catch (IOException e) - { - LOGGER.warn("Fails to list SSTables", e); - } - } + private void onSSTablesProduced(Collection<SSTableReader> sstables) + { + Objects.requireNonNull(producedSSTablesListener, "producedSSTablesListener is not set"); + Set<SSTableDescriptor> sstableDescriptors = sstables + .stream() + .map(sstable -> { + String baseFilename = baseFilename(sstable.descriptor); + // TODO: for now, the sstableReader is closed immediately, Review Comment: I left the todo because the change would be more invasive and I feel they are unrelated. The step does not simply re-read the data back, but also calculates digests, etc. Some refactoring will be required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org