Hi Deep, Could you use the TextInputFormat which reads a file line by line? That way you can do the JSON parsing as part of a mapper which consumes the file lines.
Cheers, Till On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0...@gmail.com> wrote: > Hi Deep, > > (redirecting this to user mailing list as this is not a dev question) > > You can try to set the line delimiter and field delimiter of the > RowCsvInputFormat to a non-printing character (assume there is no non-printing > characters in the csv files). It will read all the content of a csv file > into one Row. e.g. > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > String path = "test"; > TypeInformation[] fieldTypes = new TypeInformation[]{ > BasicTypeInfo.STRING_TYPE_INFO}; > RowCsvInputFormat csvFormat = > new RowCsvInputFormat(new Path(path), fieldTypes); > csvFormat.setNestedFileEnumeration(true); > csvFormat.setDelimiter((char) 0); > csvFormat.setFieldDelimiter(String.valueOf((char) 0)); > DataStream<Row> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > -1);lines.map(value -> value).print(); > env.execute(); > > > Then you can convert the content of the csv files to json manually. > > Best, > Wei > > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.d...@gmail.com> 写道: > > Hi Guys, > > Below is my code snippet , which read all csv files under the given folder > row by row but my requirement is to read csv file at a time and convert as > json which will looks like : > {"A":"1","B":"3","C":"4","D":9} > > Csv file data format : > ------------------------------- > *field_id,data,* > > > > *A,1B,3C,4D,9* > > Code snippet: > -------------------------- > > > > > > > > > > > > > > *final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();String path = > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new > RowCsvInputFormat( new Path(path), > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > -1);lines.map(value -> value).print();* > > > Any help is highly appreciated. > > Thanks, > -Deep > > >