That's correct! I only commit when batch has been fully processed and confirmed successful from external source. Thanks for the response!
On Sat, Mar 21, 2020 at 11:54 PM Liam Clarke <liam.cla...@adscale.co.nz> wrote: > Hi Ryan, > > So long as you're not committing/storing offsets until that batch has been > successfully persisted you're fine (or if you're okay with some data loss > in the event of an app failing or being scaled down). > > We use that approach in a couple of our apps that are persisting data into > datastores that prefer large bulk imports over smaller ones. :) > > Kind regards, > > Liam Clarke-Hutchinson > > On Sun, 22 Mar. 2020, 6:09 am Ryan Schachte, <coderyanschac...@gmail.com> > wrote: > > > You don't think it's weird if I just batch in memory manually do you? I > > wrote a small snippet: > > > > // Replicates Base Consumer > > while (true) { > > List<ConsumerRecord> records = consumer.poll(); > > > > // Batch records in-memory until criteria is satisfied > > consumer.recordBuffer.addAll(records); > > if (isNull(consumer.start)) { > > consumer.start = Instant.now(); > > } > > > > // Process records when either criteria is hit first > > if (minRecordsSatisfied.test(consumer.recordBuffer.size()) || > > maxTimeoutExceeded.test(consumer.start)) { > > consumer.processRecords(consumer.recordBuffer); > > } > > } > > } > > > > I have a buffer up above and some criteria like minRecords and maxTimeout > > that I just handle myself within the consumer directly. Do you think this > > is a valid approach? > > > > Thanks! > > On Fri, Mar 20, 2020 at 4:11 PM Eric Azama <eazama...@gmail.com> wrote: > > > > > Hi Ryan, > > > > > > If your end goal is just larger files on the server, you don't really > > need > > > to mess with the batching configs. You could just write multiple polls > > > worth of data to a single file. > > > > > > > > > On Fri, Mar 20, 2020 at 3:50 PM Liam Clarke <liam.cla...@adscale.co.nz > > > > > wrote: > > > > > > > Hi Ryan, > > > > > > > > That'll be per poll. > > > > > > > > Kind regards, > > > > > > > > Liam Clarke > > > > > > > > On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, < > > > coderyanschac...@gmail.com> > > > > wrote: > > > > > > > > > I do see the default for message.max.bytes is set to 1MB though. > That > > > > would > > > > > be for each record or each poll? > > > > > > > > > > On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte < > > > > coderyanschac...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Liam, > > > > > > We are running 2.3.1. I was hoping I wouldn't need to modify > > anything > > > > at > > > > > > the broker level since I do not have control/access to the broker > > > > config, > > > > > > just the consumer configuration. Am I out of luck in that case? > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke < > > > liam.cla...@adscale.co.nz > > > > > > > > > > > wrote: > > > > > > > > > > > >> Hi Ryan, > > > > > >> > > > > > >> Firstly, what version Kafka? > > > > > >> > > > > > >> Secondly check the broker's message.max.bytes and the topic's > > > > > >> max.message.bytes, I suspect they're set a lot lower (or not at > > all) > > > > and > > > > > >> will override your fetch.min.bytes. > > > > > >> > > > > > >> Cheers, > > > > > >> > > > > > >> Liam Clarke > > > > > >> > > > > > >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, < > > > > > coderyanschac...@gmail.com > > > > > >> > > > > > > >> wrote: > > > > > >> > > > > > >> > Hey guys. > > > > > >> > I'm trying to maximize the amount of data I'm batching from > > Kafka. > > > > The > > > > > >> > output is me writing the data to a file on server. I'm adding > > > > > extremely > > > > > >> > high values to my consumer configuration and I'm still getting > > > > > multiple > > > > > >> > files written with very small file sizes. > > > > > >> > > > > > > >> > As seen below, I wait a long time to retrieve my min bytes. > > After > > > > ~20 > > > > > >> > seconds the poll completes with N records and writes a pretty > > > small > > > > > >> file. > > > > > >> > I'm interpreting that as the wait time not being respected nor > > is > > > > the > > > > > >> min > > > > > >> > bytes. Why would this be the case? > > > > > >> > Code: > > > > > >> > > > > > > >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > > > > >> args.enableAutoCommit); > > > > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > > > > args.minFetchBytes); > > > > > >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, > > > > args.maxFetchBytes); > > > > > >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > > > > > >> > args.maxPartitionFetchBytes); > > > > > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, > > > > > args.maxPollRecords); > > > > > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > > > > args.maxFetchWait); > > > > > >> > > > > > > >> > Consumer configuration: > > > > > >> > > > > > > >> > --max_fetch_bytes 2147483000--min_fetch_bytes > > > > > >> > 2147483000--max_poll_records > > 2147483000--max_partition_fetch_bytes > > > > > >> > 2147483000--enable_auto_commit false--fetch_max_wait 900000 > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >