It could be related to S3 that seems to be configured for eventual consistency. Maybe it helps to configure strong consistency.
However, I recommend to replace S3 with a NoSQL database (since you are amazon Dynamo would help + Dynamodb streams, alternatively sns or sqs). The small size and high rate is not suitable for S3 or HDFS. > On 24. Jul 2018, at 07:59, Averell <lvhu...@gmail.com> wrote: > > Good day everyone, > > I have a Flink job that has an S3 folder as a source, and we keep putting > thousands of small (around 1KB each) gzip files into that folder, with the > rate of about 5000 files per minute. Here is how I created that source in > Scala: > > / val my_input_format = new TextInputFormat(new > org.apache.flink.core.fs.Path(my_path)) > my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter()) > my_input_format.setNestedFileEnumeration(true) > > val my_raw_stream = streamEnv > .readFile(my_input_format, > my_path, > FileProcessingMode.PROCESS_CONTINUOUSLY, > 1000) > / > The problem is, with the monitoring interval of 1,000ms as above, about 20% > of the files were missed. From Apache Flink Dashboard, at the subsequent > operators, I could only see ~80% of the total number of files recorded > ("Records sent" column). > > If I increase the monitoring interval, the number of missed files would > reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed. > > No WARNING/ERROR recorded though. > > I could not simulate this in HDFS, as I could not reach that high file > writing speed in our cluster. > > Could someone please help. Thank you very much. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/