Thanks Arvid! Will try to increase the property you recommended and will post the update.
On Sat, Jun 6, 2020, 7:33 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Venkata, > > you can find them on the Hadoop AWS page (we are just using it as a > library) [1]. > > [1] > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration > > On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru < > vkollur...@gmail.com> wrote: > >> Hi Kostas and Arvid, >> >> Thanks for your suggestions. >> >> The small files were already created and I am trying to roll few into a >> big file while sinking. But due to the custom bucket assigner, it is hard >> getting more files with in the same prefix in specified checkinpointing >> time. >> >> For example: >> <bucket>/prefix1/prefix2/YY/MM/DD/HH is our structure in s3. >> checkpointing interval is 5 minutes. prefix1 has 40 different values and >> prefix 2 has 10000+ values >> With in the 5 minute interval, we are able to get part file size in these >> prefixes not more than 5-10 files. >> >> Regarding printstream, will figure out how to use SimpleStringEncoder on >> a Tuple as I only need to write tuple.f2 element in the file. If you can >> guide me on how to do it, I would appreciate it. >> >> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I >> was trying to find about these parameters and could find anywhere. Is there >> a place that I could look at these config params list ? >> >> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect >> that too or is there separate param like fs.s3.connection.maximum. >> >> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <kklou...@gmail.com> wrote: >> >>> Hi all, >>> >>> @Venkata, Do you have many small files being created as Arvid suggested? >>> If yes, then I tend to agree that S3 is probably not the best sink. >>> Although I did not get that from your description. >>> In addition, instead of PrintStream you can have a look at the code of >>> the SimpleStringEncoder in Flink [1] for a bit more efficient >>> implementation. >>> >>> Cheers, >>> Kostas >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java >>> >>> >>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Venkata, >>>> >>>> are the many small files intended or is it rather an issue of our >>>> commit on checkpointing? If so then FLINK-11499 [1] should help you. Design >>>> is close to done, unfortunately implementation will not make it into 1.11. >>>> >>>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as >>>> you store both state and data on S3. I'd probably go with slot*3 or even >>>> higher. >>>> >>>> Lastly, the way you output elements looks also a bit suspicious. >>>> PrintStream is not known for great performance. I'm also surprised that it >>>> works without manual flushing. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 >>>> >>>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> >>>> wrote: >>>> >>>>> I think S3 is a wrong storage backend for this volumes of small >>>>> messages. >>>>> Try to use a NoSQL database or write multiple messages into one file >>>>> in S3 (10000 or 100000) >>>>> >>>>> If you still want to go with your scenario then try a network >>>>> optimized instance and use s3a in Flink and configure s3 entropy. >>>>> >>>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru < >>>>> vkollur...@gmail.com>: >>>>> >>>>> >>>>> Hi David, >>>>> >>>>> The avg size of each file is around 30KB and I have checkpoint >>>>> interval of 5 minutes. Some files are even 1 kb, because of checkpoint >>>>> some >>>>> files are merged into 1 big file around 300MB. >>>>> >>>>> With 120 million files and 4Tb, if the rate of transfer is 300 per >>>>> minute, it is taking weeks to write to s3. >>>>> >>>>> I have tried to increase parallelism of sink but I dont see any >>>>> improvement. >>>>> >>>>> The sink record is Tuple3<String,String,String>, the actual content of >>>>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* >>>>> >>>>> I guess the prefix determination in custombucketassigner wont be >>>>> causing this delay? >>>>> >>>>> Could you please shed some light on writing custom s3 sink ? >>>>> >>>>> Thanks >>>>> >>>>> >>>>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Venkata. >>>>>> >>>>>> 300 requests per minute look like a 200ms per request, which should >>>>>> be a normal response time to send a file if there isn't any speed >>>>>> limitation (how big are the files?). >>>>>> >>>>>> Have you changed the parallelization to be higher than 1? I also >>>>>> recommend to limit the source parallelization, because it can consume >>>>>> pretty fast from Kafka and create some kind of backpressure. >>>>>> >>>>>> I don't any much experience with StreamingFileSink, because I've >>>>>> ended up using a custom S3Sink, but I did have some issues writing to S3 >>>>>> because the request wasn't parallelised. Check this thread, >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 >>>>>> >>>>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < >>>>>> vkollur...@gmail.com> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have posted the same in stackoverflow but didnt get any response. >>>>>>> So posting it here for help. >>>>>>> >>>>>>> >>>>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >>>>>>> >>>>>>> Details: >>>>>>> >>>>>>> I am working on a flink application on kubernetes(eks) which >>>>>>> consumes data from kafka and write it to s3. >>>>>>> >>>>>>> We have around 120 million xml messages of size 4TB in kafka. >>>>>>> Consuming from kafka is super fast. >>>>>>> >>>>>>> These are just string messages from kafka. >>>>>>> >>>>>>> There is a high back pressure while writing to s3. We are not even >>>>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am >>>>>>> seeing only 300 writes per minute to S3 which is very slow. >>>>>>> >>>>>>> I am using StreamFileSink to write to s3 with Rolling policy as >>>>>>> OnCheckpointPolicy. >>>>>>> >>>>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a >>>>>>> or s3p) >>>>>>> >>>>>>> Other than this I dont have any config related to s3 >>>>>>> >>>>>>> StreamingFileSink<Tuple3<String,String, String>> sink = >>>>>>> StreamingFileSink >>>>>>> .forRowFormat(new Path(s3://BUCKET), >>>>>>> (Tuple3<String,String, String> element, >>>>>>> OutputStream stream) -> { >>>>>>> PrintStream out = new PrintStream(stream); >>>>>>> out.println(element.f2); >>>>>>> }) >>>>>>> // Determine component type for each record >>>>>>> .withBucketAssigner(new CustomBucketAssigner()) >>>>>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>>>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >>>>>>> .build(); >>>>>>> >>>>>>> Is there anything that we can optimize on s3 from streamfilesink or >>>>>>> in flink-conf.xml ? >>>>>>> >>>>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc. >>>>>>> >>>>>>> For checkpointing too I am using s3:// instead of s3p or s3a >>>>>>> >>>>>>> env.setStateBackend((StateBackend) new >>>>>>> RocksDBStateBackend(s3://checkpoint_bucket, true)); >>>>>>> env.enableCheckpointing(300000); >>>>>>> >>>>>>> >>>>>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >