Hi Robert my question was if I need to read and write the csv file of size which will be in gb how i can distribute the data sink to write into files 1gb exactly and since I am New to flink I am not sure about this
Regards Lokesh.r On Sat, Aug 26, 2017 at 2:56 AM Robert Metzger <rmetz...@apache.org> wrote: > Hi Lokesh, > > I'm not sure if I fully understood your question. But you can not write > the result in a single file from multiple writers. > If you want to process the data fully distributed, you'll also have to > write it distributed. > > On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lokes...@ericsson.com> wrote: > >> Hi Team, >> >> I am using the apache flink with java for below problem statement >> >> 1.where i will read a csv file with field delimeter character ; >> 2.transform the fields >> 3.write back the data back to csv >> >> my doubts are as below >> >> 1. if i need to read the csv file of size above 50 gb what would be the >> approach >> 2 if i use Parallelism i am not able to split the data and collect it >> since its a csv file >> and while writing a back to csv its creating a multiple files to write >> the data using the default Parallelism how can achieve the same >> >> sample input is >> >> 000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940 >> >> and sample output is >> >> >> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940 >> >> >> below is the code which i am currently running on local environment >> >> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> >> package com.ericsson.voucher; >> >> import org.apache.flink.api.common.functions.GroupReduceFunction; >> import org.apache.flink.api.common.functions.RichFlatMapFunction; >> import org.apache.flink.api.java.DataSet; >> import org.apache.flink.api.java.ExecutionEnvironment; >> import org.apache.flink.api.java.tuple.Tuple1; >> import org.apache.flink.api.java.tuple.Tuple8; >> >> import org.apache.flink.util.Collector; >> >> >> public class Classification { >> >> private static final String OUTPUT_PATH = >> "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv"; >> >> public static void main(String[] args) throws Exception { >> >> >> ExecutionEnvironment env = ExecutionEnvironment >> .getExecutionEnvironment(); >> env.setParallelism(20); >> >> long subViewStartTime = System.currentTimeMillis(); >> DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env >> .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv") >> .lineDelimiter("\n").types(String.class); >> DataSet<Tuple8<String,String, String, String, String, String, String, >> String>> mails = rawdata >> .flatMap(new DataExtractor()).rebalance(); >> mails.writeAsCsv(OUTPUT_PATH, "\n", >> ";").setParallelism(1); >> mails.print(); >> long subViewEndTime = System.currentTimeMillis(); >> >> long subViewDifference = subViewEndTime - subViewStartTime; >> >> System.out.println("The Difference Time is"+ >> subViewDifference/1000 +"seconds"); >> >> } >> >> public static class DataExtractor >> extends >> RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, >> String, String, String, String, String, String>> { >> >> /** >> * >> */ >> private static final long serialVersionUID = 1L; >> >> public void flatMap( >> Tuple1<String> paramIN, >> org.apache.flink.util.Collector<Tuple8<String, String, >> String, String, String, String, String, String>> out) >> throws Exception { >> String[] lines = paramIN.f0.split(";"); >> if (lines != null && lines.length > 0) { >> String vocuherCode =lines[0]; >> vocuherCode=vocuherCode+"TEST1"; >> String VoucherId = lines[1]; >> String voucherNumber = lines[2]; >> String status = lines[3]+"TWTSTST"; >> String startDate = lines[4] + ""; >> String endDate = lines[5] + ""; >> String endStatus = lines[6]; >> String endVoucherNumber = lines[7]; >> >> >> >> >> >> out.collect(new Tuple8<String, String, String, String, >> String, String, String, String>( >> vocuherCode, VoucherId, voucherNumber, status, >> startDate, endDate, endStatus, endVoucherNumber)); >> } >> >> } >> >> } >> >> public static class RecordReducer >> implements >> GroupReduceFunction<Tuple8<String, String, String, String, >> String, String, String, String>, >> Tuple8<String, String, String, String, String, String, >> String, String>> { >> >> >> /** >> * >> */ >> private static final long serialVersionUID = >> -6045821605365596025L; >> >> @Override >> public void reduce( >> Iterable<Tuple8<String, String, String, String, String, >> String, String, String>> paramIterable, >> Collector<Tuple8<String, String, String, String, String, >> String, String, String>> paramCollector) >> throws Exception { >> // TODO Auto-generated method stub >> String vocuherCode = null; >> String VoucherId = null; >> String voucherNumber = null; >> String status = null; >> String startDate = null; >> String endDate = null; >> String endStatus = null; >> String endVoucherNumber = null; >> for (Tuple8<String, String, String, String, String, String, >> String, String> m : paramIterable) { >> vocuherCode = m.f0; >> VoucherId = m.f1; >> voucherNumber = m.f2; >> status = m.f3; >> startDate = m.f4; >> endDate = m.f5; >> endStatus = m.f6; >> endVoucherNumber = m.f7; >> paramCollector >> .collect(new Tuple8<String, String, String, String, >> String, String, String, String>( >> vocuherCode, VoucherId, voucherNumber, status, >> startDate, endDate, endStatus, endVoucherNumber)); >> >> } >> >> >> >> } >> } >> >> >> } >> >> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> >> please help me on the same how can I achieve the portioning of fields on >> the above data and achieve the parallism to increase the throughput of my >> application >> >> >> >> >> >> >> > >