It could be related to S3 that seems to be configured for eventual consistency. 
Maybe it helps to configure strong consistency.

However, I recommend to replace S3 with a NoSQL database (since you are amazon 
Dynamo would help + Dynamodb streams, alternatively sns or sqs). The small size 
and high rate is not suitable for S3 or HDFS.

> On 24. Jul 2018, at 07:59, Averell <lvhu...@gmail.com> wrote:
> 
> Good day everyone,
> 
> I have a Flink job that has an S3 folder as a source, and we keep putting
> thousands of small (around 1KB each) gzip files into that folder, with the
> rate of about 5000 files per minute. Here is how I created that source in
> Scala:
> 
>   / val my_input_format = new TextInputFormat(new
> org.apache.flink.core.fs.Path(my_path))
>    my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
>    my_input_format.setNestedFileEnumeration(true)
> 
>    val my_raw_stream = streamEnv
>            .readFile(my_input_format,
>                my_path,
>                FileProcessingMode.PROCESS_CONTINUOUSLY,
>                1000)
> /
> The problem is, with the monitoring interval of 1,000ms as above, about 20%
> of the files were missed. From Apache Flink Dashboard, at the subsequent
> operators, I could only see ~80% of the total number of files recorded
> ("Records sent" column).
> 
> If I increase the monitoring interval, the number of missed files would
> reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed.
> 
> No WARNING/ERROR recorded though.
> 
> I could not simulate this in HDFS, as I could not reach that high file
> writing speed in our cluster.
> 
> Could someone please help. Thank you very much.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to