Hi Venkata, you can find them on the Hadoop AWS page (we are just using it as a library) [1].
[1] https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru < vkollur...@gmail.com> wrote: > Hi Kostas and Arvid, > > Thanks for your suggestions. > > The small files were already created and I am trying to roll few into a > big file while sinking. But due to the custom bucket assigner, it is hard > getting more files with in the same prefix in specified checkinpointing > time. > > For example: > <bucket>/prefix1/prefix2/YY/MM/DD/HH is our structure in s3. > checkpointing interval is 5 minutes. prefix1 has 40 different values and > prefix 2 has 10000+ values > With in the 5 minute interval, we are able to get part file size in these > prefixes not more than 5-10 files. > > Regarding printstream, will figure out how to use SimpleStringEncoder on a > Tuple as I only need to write tuple.f2 element in the file. If you can > guide me on how to do it, I would appreciate it. > > Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was > trying to find about these parameters and could find anywhere. Is there a > place that I could look at these config params list ? > > Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect > that too or is there separate param like fs.s3.connection.maximum. > > On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Hi all, >> >> @Venkata, Do you have many small files being created as Arvid suggested? >> If yes, then I tend to agree that S3 is probably not the best sink. >> Although I did not get that from your description. >> In addition, instead of PrintStream you can have a look at the code of >> the SimpleStringEncoder in Flink [1] for a bit more efficient >> implementation. >> >> Cheers, >> Kostas >> >> [1] >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java >> >> >> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Venkata, >>> >>> are the many small files intended or is it rather an issue of our commit >>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is >>> close to done, unfortunately implementation will not make it into 1.11. >>> >>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as >>> you store both state and data on S3. I'd probably go with slot*3 or even >>> higher. >>> >>> Lastly, the way you output elements looks also a bit suspicious. >>> PrintStream is not known for great performance. I'm also surprised that it >>> works without manual flushing. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-11499 >>> >>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote: >>> >>>> I think S3 is a wrong storage backend for this volumes of small >>>> messages. >>>> Try to use a NoSQL database or write multiple messages into one file in >>>> S3 (10000 or 100000) >>>> >>>> If you still want to go with your scenario then try a network optimized >>>> instance and use s3a in Flink and configure s3 entropy. >>>> >>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru < >>>> vkollur...@gmail.com>: >>>> >>>> >>>> Hi David, >>>> >>>> The avg size of each file is around 30KB and I have checkpoint interval >>>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files >>>> are merged into 1 big file around 300MB. >>>> >>>> With 120 million files and 4Tb, if the rate of transfer is 300 per >>>> minute, it is taking weeks to write to s3. >>>> >>>> I have tried to increase parallelism of sink but I dont see any >>>> improvement. >>>> >>>> The sink record is Tuple3<String,String,String>, the actual content of >>>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* >>>> >>>> I guess the prefix determination in custombucketassigner wont be >>>> causing this delay? >>>> >>>> Could you please shed some light on writing custom s3 sink ? >>>> >>>> Thanks >>>> >>>> >>>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> >>>> wrote: >>>> >>>>> Hi Venkata. >>>>> >>>>> 300 requests per minute look like a 200ms per request, which should be >>>>> a normal response time to send a file if there isn't any speed limitation >>>>> (how big are the files?). >>>>> >>>>> Have you changed the parallelization to be higher than 1? I also >>>>> recommend to limit the source parallelization, because it can consume >>>>> pretty fast from Kafka and create some kind of backpressure. >>>>> >>>>> I don't any much experience with StreamingFileSink, because I've ended >>>>> up using a custom S3Sink, but I did have some issues writing to S3 because >>>>> the request wasn't parallelised. Check this thread, >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 >>>>> >>>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < >>>>> vkollur...@gmail.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have posted the same in stackoverflow but didnt get any response. >>>>>> So posting it here for help. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >>>>>> >>>>>> Details: >>>>>> >>>>>> I am working on a flink application on kubernetes(eks) which consumes >>>>>> data from kafka and write it to s3. >>>>>> >>>>>> We have around 120 million xml messages of size 4TB in kafka. >>>>>> Consuming from kafka is super fast. >>>>>> >>>>>> These are just string messages from kafka. >>>>>> >>>>>> There is a high back pressure while writing to s3. We are not even >>>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am >>>>>> seeing only 300 writes per minute to S3 which is very slow. >>>>>> >>>>>> I am using StreamFileSink to write to s3 with Rolling policy as >>>>>> OnCheckpointPolicy. >>>>>> >>>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a >>>>>> or s3p) >>>>>> >>>>>> Other than this I dont have any config related to s3 >>>>>> >>>>>> StreamingFileSink<Tuple3<String,String, String>> sink = >>>>>> StreamingFileSink >>>>>> .forRowFormat(new Path(s3://BUCKET), >>>>>> (Tuple3<String,String, String> element, OutputStream >>>>>> stream) -> { >>>>>> PrintStream out = new PrintStream(stream); >>>>>> out.println(element.f2); >>>>>> }) >>>>>> // Determine component type for each record >>>>>> .withBucketAssigner(new CustomBucketAssigner()) >>>>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >>>>>> .build(); >>>>>> >>>>>> Is there anything that we can optimize on s3 from streamfilesink or >>>>>> in flink-conf.xml ? >>>>>> >>>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc. >>>>>> >>>>>> For checkpointing too I am using s3:// instead of s3p or s3a >>>>>> >>>>>> env.setStateBackend((StateBackend) new >>>>>> RocksDBStateBackend(s3://checkpoint_bucket, true)); >>>>>> env.enableCheckpointing(300000); >>>>>> >>>>>> >>>>>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng