I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 
(10000 or 100000)

If you still want to go with your scenario then try a network optimized 
instance and use s3a in Flink and configure s3 entropy.

> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru 
> <vkollur...@gmail.com>:
> 
> 
> Hi David,
> 
> The avg size of each file is around 30KB and I have checkpoint interval of 5 
> minutes. Some files are even 1 kb, because of checkpoint some files are 
> merged into 1 big file around 300MB.
> 
> With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it 
> is taking weeks to write to s3.
> 
> I have tried to increase parallelism of sink but I dont see any improvement. 
> 
> The sink record is Tuple3<String,String,String>, the actual content of file 
> is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 
> 
> I guess the prefix determination in custombucketassigner wont be causing this 
> delay?
> 
> Could you please shed some light on writing custom s3 sink ?
> 
> Thanks
> 
> 
>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> wrote:
>> Hi Venkata. 
>> 
>> 300 requests per minute look like a 200ms per request, which should be a 
>> normal response time to send a file if there isn't any speed limitation (how 
>> big are the files?).
>> 
>> Have you changed the parallelization to be higher than 1? I also recommend 
>> to limit the source parallelization, because it can consume pretty fast from 
>> Kafka and create some kind of backpressure.
>> 
>> I don't any much experience with StreamingFileSink, because I've ended up 
>> using a custom S3Sink, but I did have some issues writing to S3 because the 
>> request wasn't parallelised. Check this thread, 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>> 
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru 
>>> <vkollur...@gmail.com> wrote:
>>> Hello,
>>> 
>>> I have posted the same in stackoverflow but didnt get any response. So 
>>> posting it here for help.
>>> 
>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>> 
>>> Details:
>>> 
>>> I am working on a flink application on kubernetes(eks) which consumes data 
>>> from kafka and write it to s3.
>>> 
>>> We have around 120 million xml messages of size 4TB in kafka. Consuming 
>>> from kafka is super fast.
>>> 
>>> These are just string messages from kafka. 
>>> 
>>> There is a high back pressure while writing to s3. We are not even hitting 
>>> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing 
>>> only 300 writes per minute to S3 which is very slow.
>>> 
>>> I am using StreamFileSink to write to s3 with Rolling policy as 
>>> OnCheckpointPolicy.
>>> 
>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)
>>> 
>>> Other than this I dont have any config related to s3
>>> 
>>>     StreamingFileSink<Tuple3<String,String, String>> sink = 
>>> StreamingFileSink
>>>             .forRowFormat(new Path(s3://BUCKET),
>>>                     (Tuple3<String,String, String> element, OutputStream 
>>> stream) -> {
>>>                         PrintStream out = new PrintStream(stream);
>>>                         out.println(element.f2);
>>>                     })
>>>             // Determine component type for each record
>>>             .withBucketAssigner(new CustomBucketAssigner())
>>>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>>             .build(); 
>>> Is there anything that we can optimize on s3 from streamfilesink or in 
>>> flink-conf.xml ?
>>> 
>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>> 
>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>> 
>>> env.setStateBackend((StateBackend) new 
>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>>         env.enableCheckpointing(300000);
>>> 

Reply via email to