Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of
5 minutes. Some files are even 1 kb, because of checkpoint some files are
merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute,
it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any
improvement.

The sink record is Tuple3<String,String,String>, the actual content of file
is f2. This is content is written to <s3 bucket>/f0/f1/part*-*

I guess the prefix determination in custombucketassigner wont be causing
this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> wrote:

> Hi Venkata.
>
> 300 requests per minute look like a 200ms per request, which should be a
> normal response time to send a file if there isn't any speed limitation
> (how big are the files?).
>
> Have you changed the parallelization to be higher than 1? I also recommend
> to limit the source parallelization, because it can consume pretty fast
> from Kafka and create some kind of backpressure.
>
> I don't any much experience with StreamingFileSink, because I've ended up
> using a custom S3Sink, but I did have some issues writing to S3 because the
> request wasn't parallelised. Check this thread,
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>
> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> Hello,
>>
>> I have posted the same in stackoverflow but didnt get any response. So
>> posting it here for help.
>>
>>
>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>
>> Details:
>>
>> I am working on a flink application on kubernetes(eks) which consumes
>> data from kafka and write it to s3.
>>
>> We have around 120 million xml messages of size 4TB in kafka. Consuming
>> from kafka is super fast.
>>
>> These are just string messages from kafka.
>>
>> There is a high back pressure while writing to s3. We are not even
>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>> seeing only 300 writes per minute to S3 which is very slow.
>>
>> I am using StreamFileSink to write to s3 with Rolling policy as
>> OnCheckpointPolicy.
>>
>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
>> s3p)
>>
>> Other than this I dont have any config related to s3
>>
>>     StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
>>             .forRowFormat(new Path(s3://BUCKET),
>>                     (Tuple3<String,String, String> element, OutputStream 
>> stream) -> {
>>                         PrintStream out = new PrintStream(stream);
>>                         out.println(element.f2);
>>                     })
>>             // Determine component type for each record
>>             .withBucketAssigner(new CustomBucketAssigner())
>>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>             .build();
>>
>> Is there anything that we can optimize on s3 from streamfilesink or in
>> flink-conf.xml ?
>>
>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>
>> For checkpointing too I am using s3:// instead of s3p or s3a
>>
>> env.setStateBackend((StateBackend) new 
>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>         env.enableCheckpointing(300000);
>>
>>
>>

Reply via email to