Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/5176#discussion_r157548338 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -127,21 +132,31 @@ private static BlobStoreService createFileSystemBlobStore(Configuration configur } /** - * Creates a local storage directory for a blob service under the given parent directory. + * Creates a local storage directory for a blob service under the configuration parameter given + * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will + * fall back to the TaskManager temp directories (given by + * {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn falls back to + * {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set to + * <tt>java.io.tmpdir</tt>) and choose one among them at random. * - * @param basePath - * base path, i.e. parent directory, of the storage directory to use (if <tt>null</tt> or - * empty, the path in <tt>java.io.tmpdir</tt> will be used) + * @param config + * Flink configuration * * @return a new local storage directory * * @throws IOException * thrown if the local file storage cannot be created or is not usable */ - static File initLocalStorageDirectory(String basePath) throws IOException { + static File initLocalStorageDirectory(Configuration config) throws IOException { + + String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY); + File baseDir; if (StringUtils.isNullOrWhitespaceOnly(basePath)) { - baseDir = new File(System.getProperty("java.io.tmpdir")); + final String[] tmpDirPaths = config.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); --- End diff -- Consider encapsulating this parsing logic into `TaskManagerServicesConfiguration` or similar.
---