ableegoldman commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r448703709
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -136,7 +143,59 @@ private boolean taskDirEmpty(final File taskDir) { !pathname.getName().equals(CHECKPOINT_FILE_NAME)); // if the task is stateless, storeDirs would be null - return storeDirs == null || storeDirs.length == 0; + if (storeDirs == null || storeDirs.length == 0) { + return true; + } + + final List<File> baseSubDirectories = new LinkedList<>(); + for (final File file : storeDirs) { + if (file.isDirectory()) { + baseSubDirectories.add(file); + } else { + return false; + } + } + + for (final File dir : baseSubDirectories) { + final boolean isEmpty; + if (dir.getName().equals(ROCKSDB_DIRECTORY_NAME)) { + isEmpty = taskSubDirectoriesEmpty(dir, true); + } else { + isEmpty = taskSubDirectoriesEmpty(dir, false); + } + if (!isEmpty) { + return false; + } + } + return true; + } + + // BFS through the task directory to look for any files that are not more subdirectories + private boolean taskSubDirectoriesEmpty(final File baseDir, final boolean sstOnly) { + final Queue<File> subDirectories = new LinkedList<>(); + subDirectories.offer(baseDir); + + final Set<File> visited = new HashSet<>(); + while (!subDirectories.isEmpty()) { + final File dir = subDirectories.poll(); + if (!visited.contains(dir)) { + final File[] files = dir.listFiles(); + if (files == null) { + continue; + } + for (final File file : files) { + if (file.isDirectory()) { + subDirectories.offer(file); + } else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) { Review comment: Ok I discussed offline with Guozhang, we should go forward with the "write sentinel values for unknown offset" fix since it will also be needed for other work soon in 2.7. I'll remove this hacky emptiness check ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org