The error message suggests that Flink tries to resolve "D:" as a file system schema such as "file:" or "hdfs:".
Can you try to use specify your path as "file:/D:/dir/myfile.csv"? Best, Fabian 2016-10-20 14:41 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > > > I know that Flink in general supports files also on windows. For example I > just tested successfully with relative file paths (e.g. place the file in > the local directory and give just the file name then everything is working > correctly). However with absolute paths it does not work as per my previous > explanation. Nevertheless, please see also the error log below. > > > > > > Exception in thread "main" > *org.apache.flink.runtime.client.JobExecutionException*: Job execution > failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp( > *JobManager.scala:822*) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > *JobManager.scala:768*) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > *JobManager.scala:768*) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(*Future.scala:24*) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > *Future.scala:24*) > > at akka.dispatch.TaskInvocation.run(*AbstractDispatcher.scala:41*) > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > *AbstractDispatcher.scala:401*) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > *ForkJoinTask.java:260*) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( > *ForkJoinPool.java:1339*) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > *ForkJoinPool.java:1979*) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > *ForkJoinWorkerThread.java:107*) > > Caused by: *java.io.IOException*: No file system found with scheme D, > referenced in file URI 'D:/dir/myfile.csv'. > > at org.apache.flink.core.fs.FileSystem.get(*FileSystem.java:297*) > > at org.apache.flink.streaming.api.functions.source. > ContinuousFileMonitoringFunction.run( > *ContinuousFileMonitoringFunction.java:120*) > > at org.apache.flink.streaming.api.operators.StreamSource.run( > *StreamSource.java:80*) > > at org.apache.flink.streaming.api.operators.StreamSource.run( > *StreamSource.java:53*) > > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run( > *SourceStreamTask.java:56*) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > *StreamTask.java:266*) > > at org.apache.flink.runtime.taskmanager.Task.run(*Task.java:584*) > > at java.lang.Thread.run(*Thread.java:745*) > > > > > > in addition to this there is some additional error if I dig through the > output logs > > > > > > 4:33:32,651 ERROR org.apache.hadoop.util.Shell > - Failed to locate the winutils binary in > the hadoop binary path > > *java.io.IOException*: Could not locate executable null\bin\winutils.exe > in the Hadoop binaries. > > at org.apache.hadoop.util.Shell.getQualifiedBinPath(*Shell.java:318*) > > at org.apache.hadoop.util.Shell.getWinUtilsPath(*Shell.java:333*) > > at org.apache.hadoop.util.Shell.<clinit>(*Shell.java:326*) > > at org.apache.hadoop.util.StringUtils.<clinit>(*StringUtils.java:76*) > > at org.apache.hadoop.security.Groups.parseStaticMapping( > *Groups.java:92*) > > at org.apache.hadoop.security.Groups.<init>(*Groups.java:76*) > > at org.apache.hadoop.security.Groups.getUserToGroupsMappingService( > *Groups.java:239*) > > at org.apache.hadoop.security.UserGroupInformation.initialize( > *UserGroupInformation.java:255*) > > at org.apache.hadoop.security.UserGroupInformation.ensureInitialized( > *UserGroupInformation.java:232*) > > at org.apache.hadoop.security.UserGroupInformation. > loginUserFromSubject(*UserGroupInformation.java:718*) > > at org.apache.hadoop.security.UserGroupInformation.getLoginUser( > *UserGroupInformation.java:703*) > > at org.apache.hadoop.security.UserGroupInformation.getCurrentUser( > *UserGroupInformation.java:605*) > > at org.apache.hadoop.fs.viewfs.ViewFileSystem.<init>( > *ViewFileSystem.java:130*) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(*Native > Method*) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance( > *NativeConstructorAccessorImpl.java:57*) > > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( > *DelegatingConstructorAccessorImpl.java:45*) > > at java.lang.reflect.Constructor.newInstance(*Constructor.java:526*) > > at java.lang.Class.newInstance(*Class.java:379*) > > at java.util.ServiceLoader$LazyIterator.next(*ServiceLoader.java:373* > ) > > at java.util.ServiceLoader$1.next(*ServiceLoader.java:445*) > > at org.apache.hadoop.fs.FileSystem.loadFileSystems( > *FileSystem.java:2283*) > > at org.apache.hadoop.fs.FileSystem.getFileSystemClass( > *FileSystem.java:2294*) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(*Native Method*) > > at sun.reflect.NativeMethodAccessorImpl.invoke( > *NativeMethodAccessorImpl.java:57*) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > *DelegatingMethodAccessorImpl.java:43*) > > at java.lang.reflect.Method.invoke(*Method.java:606*) > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem. > getDefaultHDFSClass(*HadoopFileSystem.java:91*) > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.<init>( > *HadoopFileSystem.java:75*) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(*Native > Method*) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance( > *NativeConstructorAccessorImpl.java:57*) > > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( > *DelegatingConstructorAccessorImpl.java:45*) > > at java.lang.reflect.Constructor.newInstance(*Constructor.java:526*) > > at org.apache.flink.core.fs.FileSystem.instantiateHadoopFileSystemWra > pper(*FileSystem.java:334*) > > at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFi > leSystem(*FileSystem.java:358*) > > at org.apache.flink.core.fs.FileSystem.get(*FileSystem.java:280*) > > at org.apache.flink.streaming.api.functions.source. > ContinuousFileMonitoringFunction.run( > *ContinuousFileMonitoringFunction.java:120*) > > at org.apache.flink.streaming.api.operators.StreamSource.run( > *StreamSource.java:80*) > > at org.apache.flink.streaming.api.operators.StreamSource.run( > *StreamSource.java:53*) > > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run( > *SourceStreamTask.java:56*) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > *StreamTask.java:266*) > > at org.apache.flink.runtime.taskmanager.Task.run(*Task.java:584*) > > at java.lang.Thread.run(*Thread.java:745*) > > > > *From:* Chesnay Schepler [mailto:ches...@apache.org] > *Sent:* Thursday, October 20, 2016 2:22 PM > *To:* user@flink.apache.org > *Subject:* Re: org.apache.flink.core.fs.Path error? > > > > Hello Radu, > > Flink can handle windows paths, this alone can't be the problem. If you > could post the error you are getting we may pinpoint the issue, > but right now i would suggest the usual: check that the path is indeed > correct, that you have sufficient permissions to access the file. > > And yes, you can report problems here ;) > > Regards, > Chesnay > > On 20.10.2016 13:17, Radu Tudoran wrote: > > Hi, > > > > I am running a program that is suppose to read a CSV file from the local > disk (I am still using Flink 1.1..i did not check if the situation is the > same for 1.2). I am currently running the test on a windows OS. > > I am creating the path to the file e.g. “D:\\dir\\myfile.csv” > > However, I see that the CSV reader converts this to a Path object from > flink core > > > > “val inputFormat = new TupleCsvInputFormat(new Path(path), rowDelim, > fieldDelim, typeInfo)” In CSVTableSource > > > > This ends up representing the initial path as an URI and changes \ to / > resulting in ““D:/dir/myfile.csv””. The problem is that this is never > changed when the file is actually open and accessed which leads to an error. > > > > …not sure if signaling this error here is the best place or if I should > have used some other media.. > > > > Best regards, > > > > Dr. Radu Tudoran > > Senior Research Engineer - Big Data Expert > > IT R&D Division > > > > [image: cid:image007.jpg@01CD52EB.AD060EE0] > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>* > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > > > > >