Hi Deep,

It seems that the TypeInformation array in your code has 2 elements, but we 
only need one here. This approach treats the entire csv file as a Row which has 
only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO` 
in the array. And if you use the TextInputFormat instead of the 
RowCsvInputFormat, this problem can also be solved.

If you have created your own InputFormat via extending the RowCsvInputFormat, 
you can get the current file path via `this.currentSplit.getPath()` in your 
class. Note that if you choose to fill the file path into the second column of 
the Row, you do not need to make the above changes, because at this time we 
really need the TypeInformation array to contain two StringTypeInfo elements.

Best,
Wei


> 在 2020年12月8日,19:29,DEEP NARAYAN Singh <about.d...@gmail.com> 写道:
> 
> Hi Wei, 
> 
> Also I need to know how to get file names  along with single Row data as part 
> of Datastream during runtime.So that I can  extract some of the data from the 
> file name in the next operator to construct the final json string.
> 
> Thanks,
> -Deep
> 
> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh <about.d...@gmail.com 
> <mailto:about.d...@gmail.com>> wrote:
> Hi Wei, 
> 
> Please find the below code snippet:
> TypeInformation[] typeInformation = new 
> TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO};
> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new 
> org.apache.flink.core.fs.Path(directory), typeInformation);
> csvInputFormat.setDelimiter((char) 0);
> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0));
> csvInputFormat.setNestedFileEnumeration(true);
> csvInputFormat.setMinSplitSize(10);
> return environment
>         .readFile(csvInputFormat, directory, FileProcessingMode.PROCESS_ONCE, 
> -1, S3Service.createCustomFilter(finalParameters))
>         .name("Source: Custom File Reader for path " + 
> directory).setParallelism(readerParallelism);
> 
> But after that,I have created my own custom RowCsvInputFormat and enabled the 
> csvInputFormat.setLenient(true) and modified the class a little bit then it 
> worked.
> 
> // check valid start position
> if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 
> 1)) {
>    if (isLenient()) {
>       return true;
>    } else {
>       throw new ParseException("Row too short: " + new String(bytes, offset, 
> numBytes, getCharset()));
>    }
> }
> Let me know if you need any details.
> Thanks,
> -Deep
> 
> 
> 
> 
> 
> On Tue, Dec 8, 2020 at 8:13 AM Wei Zhong <weizhong0...@gmail.com 
> <mailto:weizhong0...@gmail.com>> wrote:
> Hi Deep,
> 
> Could you show your current code snippet? I have tried the Csv file data on 
> my local machine and it works fine, so I guess what might be wrong elsewhere.
> 
> Best,
> Wei
> 
> 
> > 在 2020年12月8日,03:20,DEEP NARAYAN Singh <about.d...@gmail.com 
> > <mailto:about.d...@gmail.com>> 写道:
> > 
> > Hi Wei and Till,
> > Thanks for the quick reply.
> > 
> > @Wei, I tried with code which you have suggested and it is working fine but 
> > I have one use case where it is failing, below is the csv input data format 
> > :
> > Csv file data format   :
> > -------------------------------
> > field_id,data,
> > A,1
> > B,3
> > C,4
> > D,9
> > E,0,0,0,0
> > 
> > because of last row which contains more that two value, and its is throwing 
> > org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,
> > 
> > How to handle the above corner case.Could you please suggest some way to 
> > handle this.
> > 
> > @Till, Could you please elaborate more which you are suggesting? As per my 
> > use case I am dealing with multiple csv files under the given folder and 
> > reading line by line using TextInputFormat  and transform will not work by 
> > using map operator. Correct me if i'm wrong .
> > 
> > Thanks & Regards,
> > -Deep
> > 
> > 
> > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <trohrm...@apache.org 
> > <mailto:trohrm...@apache.org>> wrote:
> > Hi Deep,
> > 
> > Could you use the TextInputFormat which reads a file line by line? That way
> > you can do the JSON parsing as part of a mapper which consumes the file
> > lines.
> > 
> > Cheers,
> > Till
> > 
> > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0...@gmail.com 
> > <mailto:weizhong0...@gmail.com>> wrote:
> > 
> > > Hi Deep,
> > >
> > > (redirecting this to user mailing list as this is not a dev question)
> > >
> > > You can try to set the line delimiter and field delimiter of the
> > > RowCsvInputFormat to a non-printing character (assume there is no 
> > > non-printing
> > > characters in the csv files). It will read all the content of a csv file
> > > into one Row. e.g.
> > >
> > > final StreamExecutionEnvironment env =
> > >    StreamExecutionEnvironment.getExecutionEnvironment();
> > > String path = "test";
> > > TypeInformation[] fieldTypes = new TypeInformation[]{
> > >    BasicTypeInfo.STRING_TYPE_INFO};
> > > RowCsvInputFormat csvFormat =
> > >    new RowCsvInputFormat(new Path(path), fieldTypes);
> > > csvFormat.setNestedFileEnumeration(true);
> > > csvFormat.setDelimiter((char) 0);
> > > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > > DataStream<Row>
> > >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > >    -1);lines.map(value -> value).print();
> > > env.execute();
> > >
> > >
> > > Then you can convert the content of the csv files to json manually.
> > >
> > > Best,
> > > Wei
> > >
> > >
> > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.d...@gmail.com 
> > > <mailto:about.d...@gmail.com>> 写道:
> > >
> > > Hi  Guys,
> > >
> > > Below is my code snippet , which read all csv files under the given folder
> > > row by row but my requirement is to read csv file at a time and  convert 
> > > as
> > > json which will looks like :
> > > {"A":"1","B":"3","C":"4","D":9}
> > >
> > > Csv file data format   :
> > > -------------------------------
> > > *field_id,data,*
> > >
> > >
> > >
> > > *A,1B,3C,4D,9*
> > >
> > > Code snippet:
> > > --------------------------
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
> > >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> > > RowCsvInputFormat(            new Path(path),
> > >
> > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > > -1);lines.map(value -> value).print();*
> > >
> > >
> > > Any help is highly appreciated.
> > >
> > > Thanks,
> > > -Deep
> > >
> > >
> > >
> 

Reply via email to