Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If
yes, then I tend to agree that S3 is probably not the best sink. Although I
did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the
SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java


On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Venkata,
>
> are the many small files intended or is it rather an issue of our commit
> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
> close to done, unfortunately implementation will not make it into 1.11.
>
> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
> store both state and data on S3. I'd probably go with slot*3 or even higher.
>
> Lastly, the way you output elements looks also a bit suspicious.
> PrintStream is not known for great performance. I'm also surprised that it
> works without manual flushing.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11499
>
> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote:
>
>> I think S3 is a wrong storage backend for this volumes of small messages.
>> Try to use a NoSQL database or write multiple messages into one file in
>> S3 (10000 or 100000)
>>
>> If you still want to go with your scenario then try a network optimized
>> instance and use s3a in Flink and configure s3 entropy.
>>
>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>> vkollur...@gmail.com>:
>>
>> 
>> Hi David,
>>
>> The avg size of each file is around 30KB and I have checkpoint interval
>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>> are merged into 1 big file around 300MB.
>>
>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>> minute, it is taking weeks to write to s3.
>>
>> I have tried to increase parallelism of sink but I dont see any
>> improvement.
>>
>> The sink record is Tuple3<String,String,String>, the actual content of
>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-*
>>
>> I guess the prefix determination in custombucketassigner wont be causing
>> this delay?
>>
>> Could you please shed some light on writing custom s3 sink ?
>>
>> Thanks
>>
>>
>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com>
>> wrote:
>>
>>> Hi Venkata.
>>>
>>> 300 requests per minute look like a 200ms per request, which should be a
>>> normal response time to send a file if there isn't any speed limitation
>>> (how big are the files?).
>>>
>>> Have you changed the parallelization to be higher than 1? I also
>>> recommend to limit the source parallelization, because it can consume
>>> pretty fast from Kafka and create some kind of backpressure.
>>>
>>> I don't any much experience with StreamingFileSink, because I've ended
>>> up using a custom S3Sink, but I did have some issues writing to S3 because
>>> the request wasn't parallelised. Check this thread,
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>> vkollur...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have posted the same in stackoverflow but didnt get any response. So
>>>> posting it here for help.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>>>
>>>> Details:
>>>>
>>>> I am working on a flink application on kubernetes(eks) which consumes
>>>> data from kafka and write it to s3.
>>>>
>>>> We have around 120 million xml messages of size 4TB in kafka. Consuming
>>>> from kafka is super fast.
>>>>
>>>> These are just string messages from kafka.
>>>>
>>>> There is a high back pressure while writing to s3. We are not even
>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>>>> seeing only 300 writes per minute to S3 which is very slow.
>>>>
>>>> I am using StreamFileSink to write to s3 with Rolling policy as
>>>> OnCheckpointPolicy.
>>>>
>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
>>>> s3p)
>>>>
>>>> Other than this I dont have any config related to s3
>>>>
>>>>     StreamingFileSink<Tuple3<String,String, String>> sink = 
>>>> StreamingFileSink
>>>>             .forRowFormat(new Path(s3://BUCKET),
>>>>                     (Tuple3<String,String, String> element, OutputStream 
>>>> stream) -> {
>>>>                         PrintStream out = new PrintStream(stream);
>>>>                         out.println(element.f2);
>>>>                     })
>>>>             // Determine component type for each record
>>>>             .withBucketAssigner(new CustomBucketAssigner())
>>>>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>>>             .build();
>>>>
>>>> Is there anything that we can optimize on s3 from streamfilesink or in
>>>> flink-conf.xml ?
>>>>
>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>>>
>>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>>>
>>>> env.setStateBackend((StateBackend) new 
>>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>>>         env.enableCheckpointing(300000);
>>>>
>>>>
>>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to