Hi Deep,

You can try to change the `FileProcessingMode.PROCESS_ONCE` to 
`FileProcessingMode.PROCESS_CONTINUOUSLY`.

Best,
Wei

> 在 2020年12月15日,20:18,DEEP NARAYAN Singh <about.d...@gmail.com> 写道:
> 
> Hi Wei,
> Could you please suggest , how to fix this below issues.
> 
> Thanks & Regards,
> Deep
> 
> On Mon, 14 Dec, 2020, 10:28 AM DEEP NARAYAN Singh, <about.d...@gmail.com 
> <mailto:about.d...@gmail.com>> wrote:
> Hi Wei,
> No problem at all.Thanks for your response.
> Yes ,it is just starting from the beginning like no check pointing finished.
> 
> Thanks,
> -Deep
> 
> On Mon, 14 Dec, 2020, 8:01 AM Wei Zhong, <weizhong0...@gmail.com 
> <mailto:weizhong0...@gmail.com>> wrote:
> Hi Deep,
> 
> Sorry for the late reply. Could you provide more specific information about 
> the problem? e.g. did the job skip the file that was being processed during 
> the last checkpointing, or did it start from the beginning just like no 
> checkpointing finished?
> 
> Best,
> Wei
> 
>> 在 2020年12月12日,13:14,DEEP NARAYAN Singh <about.d...@gmail.com 
>> <mailto:about.d...@gmail.com>> 写道:
>> 
>> Hi Wei,
>> I'm sorry to bother you ,could you please help me in clarifying my doubt 
>> which have mentioned in previous email?
>> 
>> Thank you in advance.
>> 
>> Regards,
>> -Deep
>> 
>> On Fri, 11 Dec, 2020, 2:16 PM DEEP NARAYAN Singh, <about.d...@gmail.com 
>> <mailto:about.d...@gmail.com>> wrote:
>> Hi Wei,
>> Just I want to clarify my doubt about check pointing as part of s3 
>> datastream source . Let say my job started with a current resource and it 
>> got failed in between because of some lack of resource (e.g  Heap space 
>> Exception etc.), In that case what I observed was that if the job is auto 
>> restart by using restart strategy , it was not processing the data from the 
>> last checkpointing .
>> 
>> Could you please help me in how to handle this case as part of s3 data 
>> source.
>> 
>> Thanks,
>> -Deep
>> 
>> On Tue, Dec 8, 2020 at 10:22 PM DEEP NARAYAN Singh <about.d...@gmail.com 
>> <mailto:about.d...@gmail.com>> wrote:
>> Hi Wei,
>> Thanks you for the clarification. I have implemented the suggest approach 
>> and it is working fine now.🙂
>> 
>> Thanks,
>> -Deep
>> 
>> On Tue, 8 Dec, 2020, 5:24 PM Wei Zhong, <weizhong0...@gmail.com 
>> <mailto:weizhong0...@gmail.com>> wrote:
>> Hi Deep,
>> 
>> It seems that the TypeInformation array in your code has 2 elements, but we 
>> only need one here. This approach treats the entire csv file as a Row which 
>> has only a one column, so there should be only one 
>> `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the 
>> TextInputFormat instead of the RowCsvInputFormat, this problem can also be 
>> solved.
>> 
>> If you have created your own InputFormat via extending the 
>> RowCsvInputFormat, you can get the current file path via 
>> `this.currentSplit.getPath()` in your class. Note that if you choose to fill 
>> the file path into the second column of the Row, you do not need to make the 
>> above changes, because at this time we really need the TypeInformation array 
>> to contain two StringTypeInfo elements.
>> 
>> Best,
>> Wei
>> 
>> 
>>> 在 2020年12月8日,19:29,DEEP NARAYAN Singh <about.d...@gmail.com 
>>> <mailto:about.d...@gmail.com>> 写道:
>>> 
>>> Hi Wei, 
>>> 
>>> Also I need to know how to get file names  along with single Row data as 
>>> part of Datastream during runtime.So that I can  extract some of the data 
>>> from the file name in the next operator to construct the final json string.
>>> 
>>> Thanks,
>>> -Deep
>>> 
>>> On Tue, Dec 8, 2020 at 4:10 PM DEEP NARAYAN Singh <about.d...@gmail.com 
>>> <mailto:about.d...@gmail.com>> wrote:
>>> Hi Wei, 
>>> 
>>> Please find the below code snippet:
>>> TypeInformation[] typeInformation = new 
>>> TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, 
>>> BasicTypeInfo.STRING_TYPE_INFO};
>>> RowCsvInputFormat csvInputFormat = new RowCsvInputFormat(new 
>>> org.apache.flink.core.fs.Path(directory), typeInformation);
>>> csvInputFormat.setDelimiter((char) 0);
>>> csvInputFormat.setFieldDelimiter(String.valueOf((char) 0));
>>> csvInputFormat.setNestedFileEnumeration(true);
>>> csvInputFormat.setMinSplitSize(10);
>>> return environment
>>>         .readFile(csvInputFormat, directory, 
>>> FileProcessingMode.PROCESS_ONCE, -1, 
>>> S3Service.createCustomFilter(finalParameters))
>>>         .name("Source: Custom File Reader for path " + 
>>> directory).setParallelism(readerParallelism);
>>> 
>>> But after that,I have created my own custom RowCsvInputFormat and enabled 
>>> the csvInputFormat.setLenient(true) and modified the class a little bit 
>>> then it worked.
>>> 
>>> // check valid start position
>>> if (startPos > limit || (startPos == limit && field != fieldIncluded.length 
>>> - 1)) {
>>>    if (isLenient()) {
>>>       return true;
>>>    } else {
>>>       throw new ParseException("Row too short: " + new String(bytes, 
>>> offset, numBytes, getCharset()));
>>>    }
>>> }
>>> Let me know if you need any details.
>>> Thanks,
>>> -Deep
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Dec 8, 2020 at 8:13 AM Wei Zhong <weizhong0...@gmail.com 
>>> <mailto:weizhong0...@gmail.com>> wrote:
>>> Hi Deep,
>>> 
>>> Could you show your current code snippet? I have tried the Csv file data on 
>>> my local machine and it works fine, so I guess what might be wrong 
>>> elsewhere.
>>> 
>>> Best,
>>> Wei
>>> 
>>> 
>>> > 在 2020年12月8日,03:20,DEEP NARAYAN Singh <about.d...@gmail.com 
>>> > <mailto:about.d...@gmail.com>> 写道:
>>> > 
>>> > Hi Wei and Till,
>>> > Thanks for the quick reply.
>>> > 
>>> > @Wei, I tried with code which you have suggested and it is working fine 
>>> > but I have one use case where it is failing, below is the csv input data 
>>> > format :
>>> > Csv file data format   :
>>> > -------------------------------
>>> > field_id,data,
>>> > A,1
>>> > B,3
>>> > C,4
>>> > D,9
>>> > E,0,0,0,0
>>> > 
>>> > because of last row which contains more that two value, and its is 
>>> > throwing org.apache.flink.api.common.io.ParseException: Row too short: 
>>> > field_id,data,
>>> > 
>>> > How to handle the above corner case.Could you please suggest some way to 
>>> > handle this.
>>> > 
>>> > @Till, Could you please elaborate more which you are suggesting? As per 
>>> > my use case I am dealing with multiple csv files under the given folder 
>>> > and reading line by line using TextInputFormat  and transform will not 
>>> > work by using map operator. Correct me if i'm wrong .
>>> > 
>>> > Thanks & Regards,
>>> > -Deep
>>> > 
>>> > 
>>> > On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <trohrm...@apache.org 
>>> > <mailto:trohrm...@apache.org>> wrote:
>>> > Hi Deep,
>>> > 
>>> > Could you use the TextInputFormat which reads a file line by line? That 
>>> > way
>>> > you can do the JSON parsing as part of a mapper which consumes the file
>>> > lines.
>>> > 
>>> > Cheers,
>>> > Till
>>> > 
>>> > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0...@gmail.com 
>>> > <mailto:weizhong0...@gmail.com>> wrote:
>>> > 
>>> > > Hi Deep,
>>> > >
>>> > > (redirecting this to user mailing list as this is not a dev question)
>>> > >
>>> > > You can try to set the line delimiter and field delimiter of the
>>> > > RowCsvInputFormat to a non-printing character (assume there is no 
>>> > > non-printing
>>> > > characters in the csv files). It will read all the content of a csv file
>>> > > into one Row. e.g.
>>> > >
>>> > > final StreamExecutionEnvironment env =
>>> > >    StreamExecutionEnvironment.getExecutionEnvironment();
>>> > > String path = "test";
>>> > > TypeInformation[] fieldTypes = new TypeInformation[]{
>>> > >    BasicTypeInfo.STRING_TYPE_INFO};
>>> > > RowCsvInputFormat csvFormat =
>>> > >    new RowCsvInputFormat(new Path(path), fieldTypes);
>>> > > csvFormat.setNestedFileEnumeration(true);
>>> > > csvFormat.setDelimiter((char) 0);
>>> > > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
>>> > > DataStream<Row>
>>> > >    lines = env.readFile(csvFormat, path, 
>>> > > FileProcessingMode.PROCESS_ONCE,
>>> > >    -1);lines.map(value -> value).print();
>>> > > env.execute();
>>> > >
>>> > >
>>> > > Then you can convert the content of the csv files to json manually.
>>> > >
>>> > > Best,
>>> > > Wei
>>> > >
>>> > >
>>> > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.d...@gmail.com 
>>> > > <mailto:about.d...@gmail.com>> 写道:
>>> > >
>>> > > Hi  Guys,
>>> > >
>>> > > Below is my code snippet , which read all csv files under the given 
>>> > > folder
>>> > > row by row but my requirement is to read csv file at a time and  
>>> > > convert as
>>> > > json which will looks like :
>>> > > {"A":"1","B":"3","C":"4","D":9}
>>> > >
>>> > > Csv file data format   :
>>> > > -------------------------------
>>> > > *field_id,data,*
>>> > >
>>> > >
>>> > >
>>> > > *A,1B,3C,4D,9*
>>> > >
>>> > > Code snippet:
>>> > > --------------------------
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > *final StreamExecutionEnvironment env =
>>> > > StreamExecutionEnvironment.getExecutionEnvironment();String path =
>>> > > "s3://messages/data/test/dev/2020-12-07/67241306/ <>";TypeInformation[]
>>> > > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>>> > >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
>>> > > RowCsvInputFormat(            new Path(path),
>>> > >
>>> > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
>>> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
>>> > > -1);lines.map(value -> value).print();*
>>> > >
>>> > >
>>> > > Any help is highly appreciated.
>>> > >
>>> > > Thanks,
>>> > > -Deep
>>> > >
>>> > >
>>> > >
>>> 
>> 
> 

Reply via email to