Hi Lokesh,

I'm not sure if I fully understood your question. But you can not write the
result in a single file from multiple writers.
If you want to process the data fully distributed, you'll also have to
write it distributed.

On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <lokes...@ericsson.com> wrote:

> Hi Team,
>
> I am using the apache flink with java for below problem statement
>
> 1.where i will read a csv file with field delimeter  character ;
> 2.transform the fields
> 3.write back the data back to csv
>
> my doubts are as below
>
> 1. if i need to read the csv file of size above 50 gb what would be the
> approach
> 2 if i use Parallelism i am not able to split the data and collect it
> since its a csv file
> and while writing a back to csv its creating a multiple files to write the
> data using the default Parallelism how can achieve the same
>
> sample input is
> 000008000077;151139924603;3526358005322;2;29/07/2016:00:
> 00:00;29/07/2018:00:00:00;20;4800019940
>
> and sample output is
>
> 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/
> 07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
>
>
> below is the code which i am currently running on local environment
>
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> package com.ericsson.voucher;
>
> import org.apache.flink.api.common.functions.GroupReduceFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple8;
>
> import org.apache.flink.util.Collector;
>
>
> public class Classification {
>
>     private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\
> OutPut\\output.csv";
>
>     public static void main(String[] args) throws Exception {
>
>
>         ExecutionEnvironment env = ExecutionEnvironment
>                 .getExecutionEnvironment();
>         env.setParallelism(20);
>
>          long subViewStartTime = System.currentTimeMillis();
>         DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
>                 .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
>                 .lineDelimiter("\n").types(String.class);
>     DataSet<Tuple8<String,String, String, String, String, String, String,
> String>> mails = rawdata
>                 .flatMap(new DataExtractor()).rebalance();
>                 mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
>            mails.print();
>            long subViewEndTime = System.currentTimeMillis();
>
>            long subViewDifference = subViewEndTime - subViewStartTime;
>
>            System.out.println("The Difference Time is"+
> subViewDifference/1000 +"seconds");
>
>     }
>
>     public static class DataExtractor
>             extends
>             RichFlatMapFunction<Tuple1<String>, Tuple8<String, String,
> String, String, String, String, String, String>> {
>
>         /**
>          *
>          */
>         private static final long serialVersionUID = 1L;
>
>         public void flatMap(
>                 Tuple1<String> paramIN,
>                 org.apache.flink.util.Collector<Tuple8<String, String,
> String, String, String, String, String, String>> out)
>                 throws Exception {
>             String[] lines = paramIN.f0.split(";");
>             if (lines != null && lines.length > 0) {
>                 String vocuherCode =lines[0];
>                 vocuherCode=vocuherCode+"TEST1";
>                 String VoucherId =  lines[1];
>                 String voucherNumber = lines[2];
>                 String status = lines[3]+"TWTSTST";
>                 String startDate = lines[4] + "";
>                 String endDate = lines[5] + "";
>                 String endStatus = lines[6];
>                 String endVoucherNumber = lines[7];
>
>
>
>
>
>             out.collect(new Tuple8<String, String, String, String, String,
> String, String, String>(
>                     vocuherCode, VoucherId, voucherNumber, status,
>                     startDate, endDate, endStatus, endVoucherNumber));
>             }
>
>         }
>
>     }
>
>     public static class RecordReducer
>             implements
>             GroupReduceFunction<Tuple8<String, String, String, String,
> String, String, String, String>,
>             Tuple8<String, String, String, String, String, String, String,
> String>> {
>
>
>         /**
>          *
>          */
>         private static final long serialVersionUID = -6045821605365596025L;
>
>         @Override
>         public void reduce(
>                 Iterable<Tuple8<String, String, String, String, String,
> String, String, String>> paramIterable,
>                 Collector<Tuple8<String, String, String, String, String,
> String, String, String>> paramCollector)
>                 throws Exception {
>             // TODO Auto-generated method stub
>             String vocuherCode = null;
>             String VoucherId = null;
>             String voucherNumber = null;
>             String status = null;
>             String startDate = null;
>             String endDate = null;
>             String endStatus = null;
>             String endVoucherNumber = null;
>             for (Tuple8<String, String, String, String, String, String,
> String, String> m : paramIterable) {
>                 vocuherCode = m.f0;
>                 VoucherId = m.f1;
>                 voucherNumber = m.f2;
>                 status = m.f3;
>                 startDate = m.f4;
>                 endDate = m.f5;
>                 endStatus = m.f6;
>                 endVoucherNumber = m.f7;
>                 paramCollector
>                 .collect(new Tuple8<String, String, String, String,
> String, String, String, String>(
>                         vocuherCode, VoucherId, voucherNumber, status,
>                         startDate, endDate, endStatus, endVoucherNumber));
>
>             }
>
>
>
>         }
>     }
>
>
> }
>
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>
> please help me on the same how can I achieve the portioning of fields on
> the above data and achieve the parallism to increase the throughput of my
> application
>
>
>
>
>
>
>

Reply via email to