Hi Lokesh, I'm not sure if I fully understood your question. But you can not write the result in a single file from multiple writers. If you want to process the data fully distributed, you'll also have to write it distributed.
On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lokes...@ericsson.com> wrote: > Hi Team, > > I am using the apache flink with java for below problem statement > > 1.where i will read a csv file with field delimeter character ; > 2.transform the fields > 3.write back the data back to csv > > my doubts are as below > > 1. if i need to read the csv file of size above 50 gb what would be the > approach > 2 if i use Parallelism i am not able to split the data and collect it > since its a csv file > and while writing a back to csv its creating a multiple files to write the > data using the default Parallelism how can achieve the same > > sample input is > 000008000077;151139924603;3526358005322;2;29/07/2016:00: > 00:00;29/07/2018:00:00:00;20;4800019940 > > and sample output is > > 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/ > 07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940 > > > below is the code which i am currently running on local environment > > ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ > > package com.ericsson.voucher; > > import org.apache.flink.api.common.functions.GroupReduceFunction; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple1; > import org.apache.flink.api.java.tuple.Tuple8; > > import org.apache.flink.util.Collector; > > > public class Classification { > > private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\ > OutPut\\output.csv"; > > public static void main(String[] args) throws Exception { > > > ExecutionEnvironment env = ExecutionEnvironment > .getExecutionEnvironment(); > env.setParallelism(20); > > long subViewStartTime = System.currentTimeMillis(); > DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env > .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv") > .lineDelimiter("\n").types(String.class); > DataSet<Tuple8<String,String, String, String, String, String, String, > String>> mails = rawdata > .flatMap(new DataExtractor()).rebalance(); > mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1); > mails.print(); > long subViewEndTime = System.currentTimeMillis(); > > long subViewDifference = subViewEndTime - subViewStartTime; > > System.out.println("The Difference Time is"+ > subViewDifference/1000 +"seconds"); > > } > > public static class DataExtractor > extends > RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, > String, String, String, String, String, String>> { > > /** > * > */ > private static final long serialVersionUID = 1L; > > public void flatMap( > Tuple1<String> paramIN, > org.apache.flink.util.Collector<Tuple8<String, String, > String, String, String, String, String, String>> out) > throws Exception { > String[] lines = paramIN.f0.split(";"); > if (lines != null && lines.length > 0) { > String vocuherCode =lines[0]; > vocuherCode=vocuherCode+"TEST1"; > String VoucherId = lines[1]; > String voucherNumber = lines[2]; > String status = lines[3]+"TWTSTST"; > String startDate = lines[4] + ""; > String endDate = lines[5] + ""; > String endStatus = lines[6]; > String endVoucherNumber = lines[7]; > > > > > > out.collect(new Tuple8<String, String, String, String, String, > String, String, String>( > vocuherCode, VoucherId, voucherNumber, status, > startDate, endDate, endStatus, endVoucherNumber)); > } > > } > > } > > public static class RecordReducer > implements > GroupReduceFunction<Tuple8<String, String, String, String, > String, String, String, String>, > Tuple8<String, String, String, String, String, String, String, > String>> { > > > /** > * > */ > private static final long serialVersionUID = -6045821605365596025L; > > @Override > public void reduce( > Iterable<Tuple8<String, String, String, String, String, > String, String, String>> paramIterable, > Collector<Tuple8<String, String, String, String, String, > String, String, String>> paramCollector) > throws Exception { > // TODO Auto-generated method stub > String vocuherCode = null; > String VoucherId = null; > String voucherNumber = null; > String status = null; > String startDate = null; > String endDate = null; > String endStatus = null; > String endVoucherNumber = null; > for (Tuple8<String, String, String, String, String, String, > String, String> m : paramIterable) { > vocuherCode = m.f0; > VoucherId = m.f1; > voucherNumber = m.f2; > status = m.f3; > startDate = m.f4; > endDate = m.f5; > endStatus = m.f6; > endVoucherNumber = m.f7; > paramCollector > .collect(new Tuple8<String, String, String, String, > String, String, String, String>( > vocuherCode, VoucherId, voucherNumber, status, > startDate, endDate, endStatus, endVoucherNumber)); > > } > > > > } > } > > > } > > ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ > > please help me on the same how can I achieve the portioning of fields on > the above data and achieve the parallism to increase the throughput of my > application > > > > > > >