I would not set > ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment().setParallelism(1);
because this changes the default parallelism of *all* operator to one. Instead, only set the parallelism of the **sink** to one (as described here: https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813) filteredData.writeAsText("file:///output1.txt").setParallelism(1); -Matthias On 11/21/2015 02:23 PM, Márton Balassi wrote: > Additionally as having multiple files under /output1.txt is standard in the > Hadoop ecosystem you can transparently read all the files with > env.readTextFile("/output1.txt"). > > You can also set parallelism on individual operators (e.g the file writer) > if you really need a single output. > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi <smar...@apache.org> wrote: > >> You can write to a single output file by setting parallelism == 1 >> >> So final ExecutionEnvironment env = ExecutionEnvironment. >> createLocalEnvironment().setParallelism(1); >> >> The reason u see multiple output files is because, each worker is writing >> to a different file. >> >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki <ja...@apache.org> wrote: >> >>> Hi Flink community >>> >>> I know I'm mistaken but could not find what I want. >>> >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.createLocalEnvironment(); >>> DataSet<String> data = env.readTextFile("file:///text1.txt"); >>> FilterFunction<String> filter = new MyFilterFunction(); // looks for a >>> line starts with "[ERROR]" >>> DataSet<String> filteredData = data.filter(filter); >>> filteredData.writeAsText("file:///output1.txt"); >>> env.execute(); >>> >>> Then I expect to get a single file /output1.txt , but actually get >>> /output1.txt/1, /output1.txt/2, /output1.txt/3... >>> I assumed I was getting a single file because the method signature says >>> writeAsText(String filePath). <-- filePath instead of directoryPath >>> Also the Javadoc comment sounds like I assumed right. >>> >>> >> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 >>> >>> Can anyone tell if the method signature and document should be fixed? or >> if >>> I am missing some configuration? >>> >>> -- >>> -jun >>> >> >
signature.asc
Description: OpenPGP digital signature