Hi Deep, You can try to change the `FileProcessingMode.PROCESS_ONCE` to `FileProcessingMode.PROCESS_CONTINUOUSLY`.
Best, Wei > 在 2020年12月15日,20:18,DEEP NARAYAN Singh <about.d...@gmail.com> 写道: > > Hi Wei, > Could you please suggest , how to fix this below issues. > > Thanks & Regards, > Deep > > On Mon, 14 Dec, 2020, 10:28 AM DEEP NARAYAN Singh, <about.d...@gmail.com > <mailto:about.d...@gmail.com>> wrote: > Hi Wei, > No problem at all.Thanks for your response. > Yes ,it is just starting from the beginning like no check pointing finished. > > Thanks, > -Deep > > On Mon, 14 Dec, 2020, 8:01 AM Wei Zhong, <weizhong0...@gmail.com > <mailto:weizhong0...@gmail.com>> wrote: > Hi Deep, > > Sorry for the late reply. Could you provide more specific information about > the problem? e.g. did the job skip the file that was being processed during > the last checkpointing, or did it start from the beginning just like no > checkpointing finished? > > Best, > Wei > >> 在 2020年12月12日,13:14,DEEP NARAYAN Singh <about.d...@gmail.com >> <mailto:about.d...@gmail.com>> 写道: >> >> Hi Wei, >> I'm sorry to bother you ,could you please help me in clarifying my doubt >> which have mentioned in previous email? >> >> Thank you in advance. >> >> Regards, >> -Deep >> >> On Fri, 11 Dec, 2020, 2:16 PM DEEP NARAYAN Singh, <about.d...@gmail.com >> <mailto:about.d...@gmail.com>> wrote: >> Hi Wei, >> Just I want to clarify my doubt about check pointing as part of s3 >> datastream source . Let say my job started with a current resource and it >> got failed in between because of some lack of resource (e.g Heap space >> Exception etc.), In that case what I observed was that if the job is auto >> restart by using restart strategy , it was not processing the data from the >> last checkpointing . >> >> Could you please help me in how to handle this case as part of s3 data >> source. >> >> Thanks, >> -Deep >> >> On Tue, Dec 8, 2020 at 10:22 PM DEEP NARAYAN Singh <about.d...@gmail.com >> <mailto:about.d...@gmail.com>> wrote: >> Hi Wei, >> Thanks you for the clarification. I have implemented the suggest approach >> and it is working fine now.🙂 >> >> Thanks, >> -Deep >> >> On Tue, 8 Dec, 2020, 5:24 PM Wei Zhong, <weizhong0...@gmail.com >> <mailto:weizhong0...@gmail.com>> wrote: >> Hi Deep, >> >> It seems that the TypeInformation array in your code has 2 elements, but we >> only need one here. This approach treats the entire csv file as a Row which >> has only a one column, so there should be only one >> `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the >> TextInputFormat instead of the RowCsvInputFormat, this problem can also be >> solved. >> >> If you have created your own InputFormat via extending the >> RowCsvInputFormat, you can get the current file path via >> `this.currentSplit.getPath()` in your class. Note that if you choose to fill >> the file path into the second column of the Row, you do not need to make the >> above changes, because at this time we really need the TypeInformation array >> to contain two StringTypeInfo elements. >> >> Best, >> Wei >> >> >>> 在 2020年12月8日,19:29,DEEP NARAYAN Singh <about.d...@gmail.com >>> <mailto:about.d...@gmail.com>> 写道: >>> >>> Hi Wei, >>> >>> Also I need to know how to get file names along with single Row data as >>> part of Datastream during runtime.So that I can extract some of the data >>> from the file name in the next operator to construct the final json string. >>> >>> Thanks, >>> -Deep >>> >>> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh <about.d...@gmail.com >>> <mailto:about.d...@gmail.com>> wrote: >>> Hi Wei, >>> >>> Please find the below code snippet: >>> TypeInformation[] typeInformation = new >>> TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, >>> BasicTypeInfo.STRING_TYPE_INFO}; >>> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new >>> org.apache.flink.core.fs.Path(directory), typeInformation); >>> csvInputFormat.setDelimiter((char) 0); >>> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0)); >>> csvInputFormat.setNestedFileEnumeration(true); >>> csvInputFormat.setMinSplitSize(10); >>> return environment >>> .readFile(csvInputFormat, directory, >>> FileProcessingMode.PROCESS_ONCE, -1, >>> S3Service.createCustomFilter(finalParameters)) >>> .name("Source: Custom File Reader for path " + >>> directory).setParallelism(readerParallelism); >>> >>> But after that,I have created my own custom RowCsvInputFormat and enabled >>> the csvInputFormat.setLenient(true) and modified the class a little bit >>> then it worked. >>> >>> // check valid start position >>> if (startPos > limit || (startPos == limit && field != fieldIncluded.length >>> - 1)) { >>> if (isLenient()) { >>> return true; >>> } else { >>> throw new ParseException("Row too short: " + new String(bytes, >>> offset, numBytes, getCharset())); >>> } >>> } >>> Let me know if you need any details. >>> Thanks, >>> -Deep >>> >>> >>> >>> >>> >>> On Tue, Dec 8, 2020 at 8:13 AM Wei Zhong <weizhong0...@gmail.com >>> <mailto:weizhong0...@gmail.com>> wrote: >>> Hi Deep, >>> >>> Could you show your current code snippet? I have tried the Csv file data on >>> my local machine and it works fine, so I guess what might be wrong >>> elsewhere. >>> >>> Best, >>> Wei >>> >>> >>> > 在 2020年12月8日,03:20,DEEP NARAYAN Singh <about.d...@gmail.com >>> > <mailto:about.d...@gmail.com>> 写道: >>> > >>> > Hi Wei and Till, >>> > Thanks for the quick reply. >>> > >>> > @Wei, I tried with code which you have suggested and it is working fine >>> > but I have one use case where it is failing, below is the csv input data >>> > format : >>> > Csv file data format : >>> > ------------------------------- >>> > field_id,data, >>> > A,1 >>> > B,3 >>> > C,4 >>> > D,9 >>> > E,0,0,0,0 >>> > >>> > because of last row which contains more that two value, and its is >>> > throwing org.apache.flink.api.common.io.ParseException: Row too short: >>> > field_id,data, >>> > >>> > How to handle the above corner case.Could you please suggest some way to >>> > handle this. >>> > >>> > @Till, Could you please elaborate more which you are suggesting? As per >>> > my use case I am dealing with multiple csv files under the given folder >>> > and reading line by line using TextInputFormat and transform will not >>> > work by using map operator. Correct me if i'm wrong . >>> > >>> > Thanks & Regards, >>> > -Deep >>> > >>> > >>> > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <trohrm...@apache.org >>> > <mailto:trohrm...@apache.org>> wrote: >>> > Hi Deep, >>> > >>> > Could you use the TextInputFormat which reads a file line by line? That >>> > way >>> > you can do the JSON parsing as part of a mapper which consumes the file >>> > lines. >>> > >>> > Cheers, >>> > Till >>> > >>> > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0...@gmail.com >>> > <mailto:weizhong0...@gmail.com>> wrote: >>> > >>> > > Hi Deep, >>> > > >>> > > (redirecting this to user mailing list as this is not a dev question) >>> > > >>> > > You can try to set the line delimiter and field delimiter of the >>> > > RowCsvInputFormat to a non-printing character (assume there is no >>> > > non-printing >>> > > characters in the csv files). It will read all the content of a csv file >>> > > into one Row. e.g. >>> > > >>> > > final StreamExecutionEnvironment env = >>> > > StreamExecutionEnvironment.getExecutionEnvironment(); >>> > > String path = "test"; >>> > > TypeInformation[] fieldTypes = new TypeInformation[]{ >>> > > BasicTypeInfo.STRING_TYPE_INFO}; >>> > > RowCsvInputFormat csvFormat = >>> > > new RowCsvInputFormat(new Path(path), fieldTypes); >>> > > csvFormat.setNestedFileEnumeration(true); >>> > > csvFormat.setDelimiter((char) 0); >>> > > csvFormat.setFieldDelimiter(String.valueOf((char) 0)); >>> > > DataStream<Row> >>> > > lines = env.readFile(csvFormat, path, >>> > > FileProcessingMode.PROCESS_ONCE, >>> > > -1);lines.map(value -> value).print(); >>> > > env.execute(); >>> > > >>> > > >>> > > Then you can convert the content of the csv files to json manually. >>> > > >>> > > Best, >>> > > Wei >>> > > >>> > > >>> > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.d...@gmail.com >>> > > <mailto:about.d...@gmail.com>> 写道: >>> > > >>> > > Hi Guys, >>> > > >>> > > Below is my code snippet , which read all csv files under the given >>> > > folder >>> > > row by row but my requirement is to read csv file at a time and >>> > > convert as >>> > > json which will looks like : >>> > > {"A":"1","B":"3","C":"4","D":9} >>> > > >>> > > Csv file data format : >>> > > ------------------------------- >>> > > *field_id,data,* >>> > > >>> > > >>> > > >>> > > *A,1B,3C,4D,9* >>> > > >>> > > Code snippet: >>> > > -------------------------- >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > *final StreamExecutionEnvironment env = >>> > > StreamExecutionEnvironment.getExecutionEnvironment();String path = >>> > > "s3://messages/data/test/dev/2020-12-07/67241306/ <>";TypeInformation[] >>> > > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, >>> > > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new >>> > > RowCsvInputFormat( new Path(path), >>> > > >>> > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> >>> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, >>> > > -1);lines.map(value -> value).print();* >>> > > >>> > > >>> > > Any help is highly appreciated. >>> > > >>> > > Thanks, >>> > > -Deep >>> > > >>> > > >>> > > >>> >> >