showuon commented on a change in pull request #10862: URL: https://github.com/apache/kafka/pull/10862#discussion_r650651936
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -126,7 +126,7 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean throw new ProcessorStateException( String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName)); } - if (!stateDir.exists() && !stateDir.mkdir()) { + if ((stateDir.exists() && !stateDir.isDirectory()) || (!stateDir.exists() && !stateDir.mkdir())) { Review comment: +1 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -230,18 +230,25 @@ public UUID initializeProcessId() { public File getOrCreateDirectoryForTask(final TaskId taskId) { final File taskParentDir = getTaskDirectoryParentName(taskId); final File taskDir = new File(taskParentDir, StateManagerUtil.toTaskDirString(taskId)); - if (hasPersistentStores && !taskDir.exists()) { - synchronized (taskDirCreationLock) { - // to avoid a race condition, we need to check again if the directory does not exist: - // otherwise, two threads might pass the outer `if` (and enter the `then` block), - // one blocks on `synchronized` while the other creates the directory, - // and the blocking one fails when trying to create it after it's unblocked - if (!taskParentDir.exists() && !taskParentDir.mkdir()) { - throw new ProcessorStateException( + if (hasPersistentStores) { + if (!taskDir.exists()) { + synchronized (taskDirCreationLock) { + // to avoid a race condition, we need to check again if the directory does not exist: + // otherwise, two threads might pass the outer `if` (and enter the `then` block), + // one blocks on `synchronized` while the other creates the directory, + // and the blocking one fails when trying to create it after it's unblocked + if (!taskParentDir.exists() && !taskParentDir.mkdir()) { + throw new ProcessorStateException( String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created", - taskParentDir.getPath(), taskDir.getPath())); + taskParentDir.getPath(), taskDir.getPath())); + } + if (!taskDir.exists() && !taskDir.mkdir()) { + throw new ProcessorStateException( + String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); + } } - if (!taskDir.exists() && !taskDir.mkdir()) { + } else { + if (!taskDir.isDirectory()) { Review comment: +1 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -230,18 +230,25 @@ public UUID initializeProcessId() { public File getOrCreateDirectoryForTask(final TaskId taskId) { final File taskParentDir = getTaskDirectoryParentName(taskId); final File taskDir = new File(taskParentDir, StateManagerUtil.toTaskDirString(taskId)); - if (hasPersistentStores && !taskDir.exists()) { - synchronized (taskDirCreationLock) { - // to avoid a race condition, we need to check again if the directory does not exist: - // otherwise, two threads might pass the outer `if` (and enter the `then` block), - // one blocks on `synchronized` while the other creates the directory, - // and the blocking one fails when trying to create it after it's unblocked - if (!taskParentDir.exists() && !taskParentDir.mkdir()) { - throw new ProcessorStateException( + if (hasPersistentStores) { + if (!taskDir.exists()) { + synchronized (taskDirCreationLock) { + // to avoid a race condition, we need to check again if the directory does not exist: + // otherwise, two threads might pass the outer `if` (and enter the `then` block), + // one blocks on `synchronized` while the other creates the directory, + // and the blocking one fails when trying to create it after it's unblocked + if (!taskParentDir.exists() && !taskParentDir.mkdir()) { + throw new ProcessorStateException( String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created", - taskParentDir.getPath(), taskDir.getPath())); + taskParentDir.getPath(), taskDir.getPath())); + } + if (!taskDir.exists() && !taskDir.mkdir()) { + throw new ProcessorStateException( + String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); + } } - if (!taskDir.exists() && !taskDir.mkdir()) { + } else { + if (!taskDir.isDirectory()) { Review comment: nit: we could use `else if` here. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ########## @@ -236,6 +236,30 @@ public void shouldThrowProcessorStateException() throws IOException { assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId)); } + @Test + public void shouldThrowProcessorStateExceptionIfStateDirOccupied() throws IOException { + final TaskId taskId = new TaskId(0, 0); + + // Replace application's stateDir to regular file + Utils.delete(appDir); + appDir.createNewFile(); + + assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId)); + } + + @Test + public void shouldThrowProcessorStateExceptionIfAppDirOccupied() throws IOException { Review comment: I think we're testing `testDir` Occupied here, not `AppDir`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org