Hi Kostas and Arvid, Thanks for your suggestions.
The small files were already created and I am trying to roll few into a big file while sinking. But due to the custom bucket assigner, it is hard getting more files with in the same prefix in specified checkinpointing time. For example: <bucket>/prefix1/prefix2/YY/MM/DD/HH is our structure in s3. checkpointing interval is 5 minutes. prefix1 has 40 different values and prefix 2 has 10000+ values With in the 5 minute interval, we are able to get part file size in these prefixes not more than 5-10 files. Regarding printstream, will figure out how to use SimpleStringEncoder on a Tuple as I only need to write tuple.f2 element in the file. If you can guide me on how to do it, I would appreciate it. Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was trying to find about these parameters and could find anywhere. Is there a place that I could look at these config params list ? Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect that too or is there separate param like fs.s3.connection.maximum. On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi all, > > @Venkata, Do you have many small files being created as Arvid suggested? > If yes, then I tend to agree that S3 is probably not the best sink. > Although I did not get that from your description. > In addition, instead of PrintStream you can have a look at the code of the > SimpleStringEncoder in Flink [1] for a bit more efficient implementation. > > Cheers, > Kostas > > [1] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java > > > On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Venkata, >> >> are the many small files intended or is it rather an issue of our commit >> on checkpointing? If so then FLINK-11499 [1] should help you. Design is >> close to done, unfortunately implementation will not make it into 1.11. >> >> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you >> store both state and data on S3. I'd probably go with slot*3 or even higher. >> >> Lastly, the way you output elements looks also a bit suspicious. >> PrintStream is not known for great performance. I'm also surprised that it >> works without manual flushing. >> >> [1] https://issues.apache.org/jira/browse/FLINK-11499 >> >> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote: >> >>> I think S3 is a wrong storage backend for this volumes of small >>> messages. >>> Try to use a NoSQL database or write multiple messages into one file in >>> S3 (10000 or 100000) >>> >>> If you still want to go with your scenario then try a network optimized >>> instance and use s3a in Flink and configure s3 entropy. >>> >>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru < >>> vkollur...@gmail.com>: >>> >>> >>> Hi David, >>> >>> The avg size of each file is around 30KB and I have checkpoint interval >>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files >>> are merged into 1 big file around 300MB. >>> >>> With 120 million files and 4Tb, if the rate of transfer is 300 per >>> minute, it is taking weeks to write to s3. >>> >>> I have tried to increase parallelism of sink but I dont see any >>> improvement. >>> >>> The sink record is Tuple3<String,String,String>, the actual content of >>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* >>> >>> I guess the prefix determination in custombucketassigner wont be causing >>> this delay? >>> >>> Could you please shed some light on writing custom s3 sink ? >>> >>> Thanks >>> >>> >>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> >>> wrote: >>> >>>> Hi Venkata. >>>> >>>> 300 requests per minute look like a 200ms per request, which should be >>>> a normal response time to send a file if there isn't any speed limitation >>>> (how big are the files?). >>>> >>>> Have you changed the parallelization to be higher than 1? I also >>>> recommend to limit the source parallelization, because it can consume >>>> pretty fast from Kafka and create some kind of backpressure. >>>> >>>> I don't any much experience with StreamingFileSink, because I've ended >>>> up using a custom S3Sink, but I did have some issues writing to S3 because >>>> the request wasn't parallelised. Check this thread, >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 >>>> >>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < >>>> vkollur...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> I have posted the same in stackoverflow but didnt get any response. So >>>>> posting it here for help. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >>>>> >>>>> Details: >>>>> >>>>> I am working on a flink application on kubernetes(eks) which consumes >>>>> data from kafka and write it to s3. >>>>> >>>>> We have around 120 million xml messages of size 4TB in kafka. >>>>> Consuming from kafka is super fast. >>>>> >>>>> These are just string messages from kafka. >>>>> >>>>> There is a high back pressure while writing to s3. We are not even >>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am >>>>> seeing only 300 writes per minute to S3 which is very slow. >>>>> >>>>> I am using StreamFileSink to write to s3 with Rolling policy as >>>>> OnCheckpointPolicy. >>>>> >>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or >>>>> s3p) >>>>> >>>>> Other than this I dont have any config related to s3 >>>>> >>>>> StreamingFileSink<Tuple3<String,String, String>> sink = >>>>> StreamingFileSink >>>>> .forRowFormat(new Path(s3://BUCKET), >>>>> (Tuple3<String,String, String> element, OutputStream >>>>> stream) -> { >>>>> PrintStream out = new PrintStream(stream); >>>>> out.println(element.f2); >>>>> }) >>>>> // Determine component type for each record >>>>> .withBucketAssigner(new CustomBucketAssigner()) >>>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >>>>> .build(); >>>>> >>>>> Is there anything that we can optimize on s3 from streamfilesink or in >>>>> flink-conf.xml ? >>>>> >>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc. >>>>> >>>>> For checkpointing too I am using s3:// instead of s3p or s3a >>>>> >>>>> env.setStateBackend((StateBackend) new >>>>> RocksDBStateBackend(s3://checkpoint_bucket, true)); >>>>> env.enableCheckpointing(300000); >>>>> >>>>> >>>>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> >