Hi Ryan, So long as you're not committing/storing offsets until that batch has been successfully persisted you're fine (or if you're okay with some data loss in the event of an app failing or being scaled down).
We use that approach in a couple of our apps that are persisting data into datastores that prefer large bulk imports over smaller ones. :) Kind regards, Liam Clarke-Hutchinson On Sun, 22 Mar. 2020, 6:09 am Ryan Schachte, <coderyanschac...@gmail.com> wrote: > You don't think it's weird if I just batch in memory manually do you? I > wrote a small snippet: > > // Replicates Base Consumer > while (true) { > List<ConsumerRecord> records = consumer.poll(); > > // Batch records in-memory until criteria is satisfied > consumer.recordBuffer.addAll(records); > if (isNull(consumer.start)) { > consumer.start = Instant.now(); > } > > // Process records when either criteria is hit first > if (minRecordsSatisfied.test(consumer.recordBuffer.size()) || > maxTimeoutExceeded.test(consumer.start)) { > consumer.processRecords(consumer.recordBuffer); > } > } > } > > I have a buffer up above and some criteria like minRecords and maxTimeout > that I just handle myself within the consumer directly. Do you think this > is a valid approach? > > Thanks! > On Fri, Mar 20, 2020 at 4:11 PM Eric Azama <eazama...@gmail.com> wrote: > > > Hi Ryan, > > > > If your end goal is just larger files on the server, you don't really > need > > to mess with the batching configs. You could just write multiple polls > > worth of data to a single file. > > > > > > On Fri, Mar 20, 2020 at 3:50 PM Liam Clarke <liam.cla...@adscale.co.nz> > > wrote: > > > > > Hi Ryan, > > > > > > That'll be per poll. > > > > > > Kind regards, > > > > > > Liam Clarke > > > > > > On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, < > > coderyanschac...@gmail.com> > > > wrote: > > > > > > > I do see the default for message.max.bytes is set to 1MB though. That > > > would > > > > be for each record or each poll? > > > > > > > > On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte < > > > coderyanschac...@gmail.com> > > > > wrote: > > > > > > > > > Hi Liam, > > > > > We are running 2.3.1. I was hoping I wouldn't need to modify > anything > > > at > > > > > the broker level since I do not have control/access to the broker > > > config, > > > > > just the consumer configuration. Am I out of luck in that case? > > > > > > > > > > > > > > > > > > > > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke < > > liam.cla...@adscale.co.nz > > > > > > > > > wrote: > > > > > > > > > >> Hi Ryan, > > > > >> > > > > >> Firstly, what version Kafka? > > > > >> > > > > >> Secondly check the broker's message.max.bytes and the topic's > > > > >> max.message.bytes, I suspect they're set a lot lower (or not at > all) > > > and > > > > >> will override your fetch.min.bytes. > > > > >> > > > > >> Cheers, > > > > >> > > > > >> Liam Clarke > > > > >> > > > > >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, < > > > > coderyanschac...@gmail.com > > > > >> > > > > > >> wrote: > > > > >> > > > > >> > Hey guys. > > > > >> > I'm trying to maximize the amount of data I'm batching from > Kafka. > > > The > > > > >> > output is me writing the data to a file on server. I'm adding > > > > extremely > > > > >> > high values to my consumer configuration and I'm still getting > > > > multiple > > > > >> > files written with very small file sizes. > > > > >> > > > > > >> > As seen below, I wait a long time to retrieve my min bytes. > After > > > ~20 > > > > >> > seconds the poll completes with N records and writes a pretty > > small > > > > >> file. > > > > >> > I'm interpreting that as the wait time not being respected nor > is > > > the > > > > >> min > > > > >> > bytes. Why would this be the case? > > > > >> > Code: > > > > >> > > > > > >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > > > >> args.enableAutoCommit); > > > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > > > args.minFetchBytes); > > > > >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, > > > args.maxFetchBytes); > > > > >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > > > > >> > args.maxPartitionFetchBytes); > > > > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, > > > > args.maxPollRecords); > > > > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > > > args.maxFetchWait); > > > > >> > > > > > >> > Consumer configuration: > > > > >> > > > > > >> > --max_fetch_bytes 2147483000--min_fetch_bytes > > > > >> > 2147483000--max_poll_records > 2147483000--max_partition_fetch_bytes > > > > >> > 2147483000--enable_auto_commit false--fetch_max_wait 900000 > > > > >> > > > > > >> > > > > > > > > > > > > > > >