Hi Kostas Thanks for the reply, Yep i am planning to implement the same.
On Mon, Mar 26, 2018 at 7:53 PM, Kostas Kloudas <k.klou...@data-artisans.com > wrote: > Hi Puneet, > > If you mean that after processing a file, you want to move it to another > directory outside the one containing > the data to be processed, then I am afraid that this is currently not > possible. This is because the whole logic > of how to treat files is included in your FileInputFormat. > > It may be possible to do it if you implement your custom FileInputFormat > that creates the splits, moves the file, > and modifies the splits to point to the new location of the file before > shipping them downstream to be read > (but I have not done it). > > Keep in mind that if you do not change the contents of the file, then it > will not be reprocessed. > > Cheers, > Kostas > > On Mar 26, 2018, at 12:18 PM, Puneet Kinra <puneet.kinra@customercentria. > com> wrote: > > Hi Timo > FileInputFormat fileInputFormat = new TextInputFormat(new > Path(fileSystem+this.path)); > fileInputFormat.setNestedFileEnumeration(true); > fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv")); > DataStream<String>value =this.execEnv.readFile(fileInputFormat, > fileSystem+this.path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1); > > *1) Now if i set parallelism equals to 1 the file get sequentially > processed .* > *2) Modify splits **are being processed on the same task manger > sequentially.* > *3) I want to move the files after being processed.(How to achieve this) .* > > > > > On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <twal...@apache.org> wrote: > >> Hi Puneet, >> >> can you share a little code example with us? I could not reproduce your >> problem. >> >> You have to keep in mind that a setParallelism() only affects the last >> operation. If you want to change the default parallelism of the entire >> pipeline, you have to change it in StreamExecutionEnvironment. Otherwise >> every following operator will again have the full parallelism which leads >> to a shuffle operation after your source. >> >> I hope this helps. >> >> Regards, >> Timo >> >> >> Am 22.03.18 um 09:17 schrieb Puneet Kinra: >> >> if i set parallelsim equals to 1 still it create multiple splits while >> processing. >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> *e-mail :puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> >> >> > > > -- > *Cheers * > > *Puneet Kinra* > > *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > *e-mail :puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > > > -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*