yifan-c commented on code in PR #107:
URL: 
https://github.com/apache/cassandra-analytics/pull/107#discussion_r2047944247


##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java:
##########
@@ -69,94 +56,56 @@ public SSTableWriterImplementation(String inDirectory,
                                        @NotNull Set<String> 
userDefinedTypeStatements,
                                        int bufferSizeMB)
     {
-        this(inDirectory, partitioner, createStatement, insertStatement, 
userDefinedTypeStatements, bufferSizeMB, 10);
+        this(inDirectory, determineSupportedPartitioner(partitioner), 
createStatement, insertStatement, userDefinedTypeStatements, bufferSizeMB);
     }
 
     @VisibleForTesting
-    SSTableWriterImplementation(String inDirectory,
-                                String partitioner,
-                                String createStatement,
-                                String insertStatement,
-                                @NotNull Set<String> userDefinedTypeStatements,
-                                int bufferSizeMB,
-                                long sstableWatcherDelaySeconds)
+    public SSTableWriterImplementation(String inDirectory,
+                                       IPartitioner partitioner,
+                                       String createStatement,
+                                       String insertStatement,
+                                       @NotNull Set<String> 
userDefinedTypeStatements,
+                                       int bufferSizeMB)
     {
-        IPartitioner cassPartitioner = 
partitioner.toLowerCase().contains("random") ? new RandomPartitioner()
-                                                                               
     : new Murmur3Partitioner();
-
         this.writer = configureBuilder(inDirectory,
                                        createStatement,
                                        insertStatement,
                                        bufferSizeMB,
                                        userDefinedTypeStatements,
-                                       cassPartitioner)
+                                       this::onSSTablesProduced,
+                                       partitioner)
                       .build();
-        this.outputDir = Paths.get(inDirectory);
-        this.sstableWatcher = new SSTableWatcher(sstableWatcherDelaySeconds);
     }
 
-    private class SSTableWatcher implements Closeable
+    private static IPartitioner determineSupportedPartitioner(String 
partitioner)
     {
-        // The TOC component is the last one flushed when finishing a SSTable.
-        // Therefore, it monitors the creation of the TOC component to 
determine the creation of SSTable
-        private static final String TOC_COMPONENT_SUFFIX = "-TOC.txt";
-        private static final String GLOB_PATTERN_FOR_TOC = "*" + 
TOC_COMPONENT_SUFFIX;
-
-        private final ScheduledExecutorService sstableWatcherScheduler;
-        private final Set<SSTableDescriptor> knownSSTables;
-
-        SSTableWatcher(long delaySeconds)
-        {
-            ThreadFactory tf = ThreadUtil.threadFactory("SSTableWatcher-" + 
outputDir.getFileName().toString());
-            this.sstableWatcherScheduler = 
Executors.newSingleThreadScheduledExecutor(tf);
-            this.knownSSTables = new HashSet<>();
-            sstableWatcherScheduler.scheduleWithFixedDelay(this::listSSTables, 
delaySeconds, delaySeconds, TimeUnit.SECONDS);
-        }
+        return partitioner.toLowerCase().contains("random")
+               ? new RandomPartitioner()
+               : new Murmur3Partitioner();
+    }
 
-        private void listSSTables()
-        {
-            try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(outputDir, GLOB_PATTERN_FOR_TOC))
-            {
-                HashSet<SSTableDescriptor> newlyProducedSSTables = new 
HashSet<>();
-                stream.forEach(path -> {
-                    String baseFilename = 
path.getFileName().toString().replace(TOC_COMPONENT_SUFFIX, "");
-                    SSTableDescriptor sstable = new 
SSTableDescriptor(baseFilename);
-                    if (!knownSSTables.contains(sstable))
-                    {
-                        newlyProducedSSTables.add(sstable);
-                    }
-                });
-
-                if (!newlyProducedSSTables.isEmpty())
-                {
-                    knownSSTables.addAll(newlyProducedSSTables);
-                    producedSSTablesListener.accept(newlyProducedSSTables);
-                }
-            }
-            catch (IOException e)
-            {
-                LOGGER.warn("Fails to list SSTables", e);
-            }
-        }
+    private void onSSTablesProduced(Collection<SSTableReader> sstables)
+    {
+        Objects.requireNonNull(producedSSTablesListener, 
"producedSSTablesListener is not set");
+        Set<SSTableDescriptor> sstableDescriptors = sstables
+                                                    .stream()
+                                                    .map(sstable -> {
+                                                        String baseFilename = 
baseFilename(sstable.descriptor);
+                                                        // TODO: for now, the 
sstableReader is closed immediately,

Review Comment:
   I left the todo because the change would be more invasive and I feel they 
are unrelated. The step does not simply re-read the data back, but also 
calculates digests, etc. Some refactoring will be required. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to