Hi Paweł, I believe this is a bug. I don't think many people use Flink to write to an FTP server, that's why this hasn't been addressed yet. There's probably something off with the semantics of distributed vs non-distributed file systems. I guess the easiest way to resolve this is by running your Flink job out of an IDE, and attaching a debugger to the FileOutputFormat.initializeGlobal() method to check what goes wrong.
Best, Robert On Fri, Jul 10, 2020 at 7:00 PM Paweł Goliszewski <pawlik9.hams...@gmail.com> wrote: > Hi to all, > > > > I tried to send a file from local storage to ftp server in docker > container (image: stilliard/pure-ftpd) using Flink 1.10 with hadoop 2.8.5. > I tried to do so with the following code: > > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > DataSource<Tuple1<String>> lines = env.readCsvFile(" > file:///path/to/myfile.csv").types(String.class); > > lines.writeAsCsv("ftp://user:pass@localhost:21/test"); > > > > env.execute(); > > After running it I get an exception with the following stacktrace: > > Exception in thread "main" java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > > at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199) > > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:952) > > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860) > > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844) > > at org.example.BatchJob.main(BatchJob.java:44) > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:947) > > ... 3 more > > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit job. > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336) > > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > > at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > > ... 6 more > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152) > > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379) > > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > > ... 7 more > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSink (CsvOutputFormat (path: > ftp://goli:kotlet@localhost:21/test, delimiter: ,))': Output directory > could not be created. > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) > > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) > > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) > > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146) > > ... 10 more > > Caused by: java.io.IOException: Output directory could not be created. > > at > org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:296) > > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:100) > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212) > > ... 20 more > > But the directory is created despite the exception. I tried several > ftp-server docker images, but the result is similar. Is it a bug or should > it be done in another way? > > Thanks in advance, > Paweł >