showuon commented on a change in pull request #10862:
URL: https://github.com/apache/kafka/pull/10862#discussion_r650651936


##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -126,7 +126,7 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
                 throw new ProcessorStateException(
                     String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
             }
-            if (!stateDir.exists() && !stateDir.mkdir()) {
+            if ((stateDir.exists() && !stateDir.isDirectory()) || 
(!stateDir.exists() && !stateDir.mkdir())) {

Review comment:
       +1

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -230,18 +230,25 @@ public UUID initializeProcessId() {
     public File getOrCreateDirectoryForTask(final TaskId taskId) {
         final File taskParentDir = getTaskDirectoryParentName(taskId);
         final File taskDir = new File(taskParentDir, 
StateManagerUtil.toTaskDirString(taskId));
-        if (hasPersistentStores && !taskDir.exists()) {
-            synchronized (taskDirCreationLock) {
-                // to avoid a race condition, we need to check again if the 
directory does not exist:
-                // otherwise, two threads might pass the outer `if` (and enter 
the `then` block),
-                // one blocks on `synchronized` while the other creates the 
directory,
-                // and the blocking one fails when trying to create it after 
it's unblocked
-                if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
-                    throw new ProcessorStateException(
+        if (hasPersistentStores) {
+            if (!taskDir.exists()) {
+                synchronized (taskDirCreationLock) {
+                    // to avoid a race condition, we need to check again if 
the directory does not exist:
+                    // otherwise, two threads might pass the outer `if` (and 
enter the `then` block),
+                    // one blocks on `synchronized` while the other creates 
the directory,
+                    // and the blocking one fails when trying to create it 
after it's unblocked
+                    if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
+                        throw new ProcessorStateException(
                             String.format("Parent [%s] of task directory [%s] 
doesn't exist and couldn't be created",
-                                    taskParentDir.getPath(), 
taskDir.getPath()));
+                                taskParentDir.getPath(), taskDir.getPath()));
+                    }
+                    if (!taskDir.exists() && !taskDir.mkdir()) {
+                        throw new ProcessorStateException(
+                            String.format("task directory [%s] doesn't exist 
and couldn't be created", taskDir.getPath()));
+                    }
                 }
-                if (!taskDir.exists() && !taskDir.mkdir()) {
+            } else {
+                if (!taskDir.isDirectory()) {

Review comment:
       +1

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -230,18 +230,25 @@ public UUID initializeProcessId() {
     public File getOrCreateDirectoryForTask(final TaskId taskId) {
         final File taskParentDir = getTaskDirectoryParentName(taskId);
         final File taskDir = new File(taskParentDir, 
StateManagerUtil.toTaskDirString(taskId));
-        if (hasPersistentStores && !taskDir.exists()) {
-            synchronized (taskDirCreationLock) {
-                // to avoid a race condition, we need to check again if the 
directory does not exist:
-                // otherwise, two threads might pass the outer `if` (and enter 
the `then` block),
-                // one blocks on `synchronized` while the other creates the 
directory,
-                // and the blocking one fails when trying to create it after 
it's unblocked
-                if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
-                    throw new ProcessorStateException(
+        if (hasPersistentStores) {
+            if (!taskDir.exists()) {
+                synchronized (taskDirCreationLock) {
+                    // to avoid a race condition, we need to check again if 
the directory does not exist:
+                    // otherwise, two threads might pass the outer `if` (and 
enter the `then` block),
+                    // one blocks on `synchronized` while the other creates 
the directory,
+                    // and the blocking one fails when trying to create it 
after it's unblocked
+                    if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
+                        throw new ProcessorStateException(
                             String.format("Parent [%s] of task directory [%s] 
doesn't exist and couldn't be created",
-                                    taskParentDir.getPath(), 
taskDir.getPath()));
+                                taskParentDir.getPath(), taskDir.getPath()));
+                    }
+                    if (!taskDir.exists() && !taskDir.mkdir()) {
+                        throw new ProcessorStateException(
+                            String.format("task directory [%s] doesn't exist 
and couldn't be created", taskDir.getPath()));
+                    }
                 }
-                if (!taskDir.exists() && !taskDir.mkdir()) {
+            } else {
+                if (!taskDir.isDirectory()) {

Review comment:
       nit: we could use `else if` here.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##########
@@ -236,6 +236,30 @@ public void shouldThrowProcessorStateException() throws 
IOException {
         assertThrows(ProcessorStateException.class, () -> 
directory.getOrCreateDirectoryForTask(taskId));
     }
 
+    @Test
+    public void shouldThrowProcessorStateExceptionIfStateDirOccupied() throws 
IOException {
+        final TaskId taskId = new TaskId(0, 0);
+
+        // Replace application's stateDir to regular file
+        Utils.delete(appDir);
+        appDir.createNewFile();
+
+        assertThrows(ProcessorStateException.class, () -> 
directory.getOrCreateDirectoryForTask(taskId));
+    }
+
+    @Test
+    public void shouldThrowProcessorStateExceptionIfAppDirOccupied() throws 
IOException {

Review comment:
       I think we're testing `testDir` Occupied here, not `AppDir`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to