Hi Kostas and Arvid,

Thanks for your suggestions.

The small files were already created and I am trying to roll few into a big
file while sinking. But due to the custom bucket assigner, it is hard
getting more files with in the same prefix in specified checkinpointing
time.

For example:
<bucket>/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
checkpointing interval is 5 minutes. prefix1 has 40 different values and
prefix 2 has 10000+ values
With in the 5 minute interval, we are able to get part file size in these
prefixes not more than 5-10 files.

Regarding printstream, will figure out how to use SimpleStringEncoder on a
Tuple as I only need to write tuple.f2 element in the file. If you can
guide me on how to do it, I would appreciate it.

Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was
trying to find about these parameters and could find anywhere. Is there a
place that I could look at these config params list ?

Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
that too or is there separate param like fs.s3.connection.maximum.

On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi all,
>
> @Venkata, Do you have many small files being created as Arvid suggested?
> If yes, then I tend to agree that S3 is probably not the best sink.
> Although I did not get that from your description.
> In addition, instead of PrintStream you can have a look at the code of the
> SimpleStringEncoder in Flink [1] for a bit more efficient implementation.
>
> Cheers,
> Kostas
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>
>
> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Venkata,
>>
>> are the many small files intended or is it rather an issue of our commit
>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
>> close to done, unfortunately implementation will not make it into 1.11.
>>
>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
>> store both state and data on S3. I'd probably go with slot*3 or even higher.
>>
>> Lastly, the way you output elements looks also a bit suspicious.
>> PrintStream is not known for great performance. I'm also surprised that it
>> works without manual flushing.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>
>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote:
>>
>>> I think S3 is a wrong storage backend for this volumes of small
>>> messages.
>>> Try to use a NoSQL database or write multiple messages into one file in
>>> S3 (10000 or 100000)
>>>
>>> If you still want to go with your scenario then try a network optimized
>>> instance and use s3a in Flink and configure s3 entropy.
>>>
>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>>> vkollur...@gmail.com>:
>>>
>>> 
>>> Hi David,
>>>
>>> The avg size of each file is around 30KB and I have checkpoint interval
>>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>>> are merged into 1 big file around 300MB.
>>>
>>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>>> minute, it is taking weeks to write to s3.
>>>
>>> I have tried to increase parallelism of sink but I dont see any
>>> improvement.
>>>
>>> The sink record is Tuple3<String,String,String>, the actual content of
>>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-*
>>>
>>> I guess the prefix determination in custombucketassigner wont be causing
>>> this delay?
>>>
>>> Could you please shed some light on writing custom s3 sink ?
>>>
>>> Thanks
>>>
>>>
>>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Venkata.
>>>>
>>>> 300 requests per minute look like a 200ms per request, which should be
>>>> a normal response time to send a file if there isn't any speed limitation
>>>> (how big are the files?).
>>>>
>>>> Have you changed the parallelization to be higher than 1? I also
>>>> recommend to limit the source parallelization, because it can consume
>>>> pretty fast from Kafka and create some kind of backpressure.
>>>>
>>>> I don't any much experience with StreamingFileSink, because I've ended
>>>> up using a custom S3Sink, but I did have some issues writing to S3 because
>>>> the request wasn't parallelised. Check this thread,
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>>
>>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>>> vkollur...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have posted the same in stackoverflow but didnt get any response. So
>>>>> posting it here for help.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>>>>
>>>>> Details:
>>>>>
>>>>> I am working on a flink application on kubernetes(eks) which consumes
>>>>> data from kafka and write it to s3.
>>>>>
>>>>> We have around 120 million xml messages of size 4TB in kafka.
>>>>> Consuming from kafka is super fast.
>>>>>
>>>>> These are just string messages from kafka.
>>>>>
>>>>> There is a high back pressure while writing to s3. We are not even
>>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>>>>> seeing only 300 writes per minute to S3 which is very slow.
>>>>>
>>>>> I am using StreamFileSink to write to s3 with Rolling policy as
>>>>> OnCheckpointPolicy.
>>>>>
>>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
>>>>> s3p)
>>>>>
>>>>> Other than this I dont have any config related to s3
>>>>>
>>>>>     StreamingFileSink<Tuple3<String,String, String>> sink = 
>>>>> StreamingFileSink
>>>>>             .forRowFormat(new Path(s3://BUCKET),
>>>>>                     (Tuple3<String,String, String> element, OutputStream 
>>>>> stream) -> {
>>>>>                         PrintStream out = new PrintStream(stream);
>>>>>                         out.println(element.f2);
>>>>>                     })
>>>>>             // Determine component type for each record
>>>>>             .withBucketAssigner(new CustomBucketAssigner())
>>>>>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>>>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>>>>             .build();
>>>>>
>>>>> Is there anything that we can optimize on s3 from streamfilesink or in
>>>>> flink-conf.xml ?
>>>>>
>>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>>>>
>>>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>>>>
>>>>> env.setStateBackend((StateBackend) new 
>>>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>>>>         env.enableCheckpointing(300000);
>>>>>
>>>>>
>>>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

Reply via email to