You don't think it's weird if I just batch in memory manually do you? I wrote a small snippet:
// Replicates Base Consumer while (true) { List<ConsumerRecord> records = consumer.poll(); // Batch records in-memory until criteria is satisfied consumer.recordBuffer.addAll(records); if (isNull(consumer.start)) { consumer.start = Instant.now(); } // Process records when either criteria is hit first if (minRecordsSatisfied.test(consumer.recordBuffer.size()) || maxTimeoutExceeded.test(consumer.start)) { consumer.processRecords(consumer.recordBuffer); } } } I have a buffer up above and some criteria like minRecords and maxTimeout that I just handle myself within the consumer directly. Do you think this is a valid approach? Thanks! On Fri, Mar 20, 2020 at 4:11 PM Eric Azama <eazama...@gmail.com> wrote: > Hi Ryan, > > If your end goal is just larger files on the server, you don't really need > to mess with the batching configs. You could just write multiple polls > worth of data to a single file. > > > On Fri, Mar 20, 2020 at 3:50 PM Liam Clarke <liam.cla...@adscale.co.nz> > wrote: > > > Hi Ryan, > > > > That'll be per poll. > > > > Kind regards, > > > > Liam Clarke > > > > On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, < > coderyanschac...@gmail.com> > > wrote: > > > > > I do see the default for message.max.bytes is set to 1MB though. That > > would > > > be for each record or each poll? > > > > > > On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte < > > coderyanschac...@gmail.com> > > > wrote: > > > > > > > Hi Liam, > > > > We are running 2.3.1. I was hoping I wouldn't need to modify anything > > at > > > > the broker level since I do not have control/access to the broker > > config, > > > > just the consumer configuration. Am I out of luck in that case? > > > > > > > > > > > > > > > > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke < > liam.cla...@adscale.co.nz > > > > > > > wrote: > > > > > > > >> Hi Ryan, > > > >> > > > >> Firstly, what version Kafka? > > > >> > > > >> Secondly check the broker's message.max.bytes and the topic's > > > >> max.message.bytes, I suspect they're set a lot lower (or not at all) > > and > > > >> will override your fetch.min.bytes. > > > >> > > > >> Cheers, > > > >> > > > >> Liam Clarke > > > >> > > > >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, < > > > coderyanschac...@gmail.com > > > >> > > > > >> wrote: > > > >> > > > >> > Hey guys. > > > >> > I'm trying to maximize the amount of data I'm batching from Kafka. > > The > > > >> > output is me writing the data to a file on server. I'm adding > > > extremely > > > >> > high values to my consumer configuration and I'm still getting > > > multiple > > > >> > files written with very small file sizes. > > > >> > > > > >> > As seen below, I wait a long time to retrieve my min bytes. After > > ~20 > > > >> > seconds the poll completes with N records and writes a pretty > small > > > >> file. > > > >> > I'm interpreting that as the wait time not being respected nor is > > the > > > >> min > > > >> > bytes. Why would this be the case? > > > >> > Code: > > > >> > > > > >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > > >> args.enableAutoCommit); > > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > > args.minFetchBytes); > > > >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, > > args.maxFetchBytes); > > > >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > > > >> > args.maxPartitionFetchBytes); > > > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, > > > args.maxPollRecords); > > > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > > args.maxFetchWait); > > > >> > > > > >> > Consumer configuration: > > > >> > > > > >> > --max_fetch_bytes 2147483000--min_fetch_bytes > > > >> > 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes > > > >> > 2147483000--enable_auto_commit false--fetch_max_wait 900000 > > > >> > > > > >> > > > > > > > > > >