That's correct! I only commit when batch has been fully processed and
confirmed successful from external source. Thanks for the response!

On Sat, Mar 21, 2020 at 11:54 PM Liam Clarke <liam.cla...@adscale.co.nz>
wrote:

> Hi Ryan,
>
> So long as you're not committing/storing offsets until that batch has been
> successfully persisted you're fine (or if you're okay with some data loss
> in the event of an app failing or being scaled down).
>
> We use that approach in a couple of our apps that are persisting data into
> datastores that prefer large bulk imports over smaller ones. :)
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Sun, 22 Mar. 2020, 6:09 am Ryan Schachte, <coderyanschac...@gmail.com>
> wrote:
>
> > You don't think it's weird if I just batch in memory manually do you? I
> > wrote a small snippet:
> >
> >   // Replicates Base Consumer
> >   while (true) {
> >     List<ConsumerRecord> records = consumer.poll();
> >
> >     // Batch records in-memory until criteria is satisfied
> >     consumer.recordBuffer.addAll(records);
> >     if (isNull(consumer.start)) {
> >       consumer.start = Instant.now();
> >     }
> >
> >     // Process records when either criteria is hit first
> >     if (minRecordsSatisfied.test(consumer.recordBuffer.size()) ||
> >         maxTimeoutExceeded.test(consumer.start)) {
> >       consumer.processRecords(consumer.recordBuffer);
> >     }
> >   }
> > }
> >
> > I have a buffer up above and some criteria like minRecords and maxTimeout
> > that I just handle myself within the consumer directly. Do you think this
> > is a valid approach?
> >
> > Thanks!
> > On Fri, Mar 20, 2020 at 4:11 PM Eric Azama <eazama...@gmail.com> wrote:
> >
> > > Hi Ryan,
> > >
> > > If your end goal is just larger files on the server, you don't really
> > need
> > > to mess with the batching configs. You could just write multiple polls
> > > worth of data to a single file.
> > >
> > >
> > > On Fri, Mar 20, 2020 at 3:50 PM Liam Clarke <liam.cla...@adscale.co.nz
> >
> > > wrote:
> > >
> > > > Hi Ryan,
> > > >
> > > > That'll be per poll.
> > > >
> > > > Kind regards,
> > > >
> > > > Liam Clarke
> > > >
> > > > On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, <
> > > coderyanschac...@gmail.com>
> > > > wrote:
> > > >
> > > > > I do see the default for message.max.bytes is set to 1MB though.
> That
> > > > would
> > > > > be for each record or each poll?
> > > > >
> > > > > On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte <
> > > > coderyanschac...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Liam,
> > > > > > We are running 2.3.1. I was hoping I wouldn't need to modify
> > anything
> > > > at
> > > > > > the broker level since I do not have control/access to the broker
> > > > config,
> > > > > > just the consumer configuration. Am I out of luck in that case?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke <
> > > liam.cla...@adscale.co.nz
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Ryan,
> > > > > >>
> > > > > >> Firstly, what version Kafka?
> > > > > >>
> > > > > >> Secondly check the broker's message.max.bytes and the topic's
> > > > > >> max.message.bytes, I suspect they're set a lot lower (or not at
> > all)
> > > > and
> > > > > >> will override your fetch.min.bytes.
> > > > > >>
> > > > > >> Cheers,
> > > > > >>
> > > > > >> Liam Clarke
> > > > > >>
> > > > > >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, <
> > > > > coderyanschac...@gmail.com
> > > > > >> >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hey guys.
> > > > > >> > I'm trying to maximize the amount of data I'm batching from
> > Kafka.
> > > > The
> > > > > >> > output is me writing the data to a file on server. I'm adding
> > > > > extremely
> > > > > >> > high values to my consumer configuration and I'm still getting
> > > > > multiple
> > > > > >> > files written with very small file sizes.
> > > > > >> >
> > > > > >> > As seen below, I wait a long time to retrieve my min bytes.
> > After
> > > > ~20
> > > > > >> > seconds the poll completes with N records and writes a pretty
> > > small
> > > > > >> file.
> > > > > >> > I'm interpreting that as the wait time not being respected nor
> > is
> > > > the
> > > > > >> min
> > > > > >> > bytes. Why would this be the case?
> > > > > >> > Code:
> > > > > >> >
> > > > > >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > > > > >> args.enableAutoCommit);
> > > > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> > > > args.minFetchBytes);
> > > > > >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
> > > > args.maxFetchBytes);
> > > > > >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > > > > >> > args.maxPartitionFetchBytes);
> > > > > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> > > > > args.maxPollRecords);
> > > > > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> > > > args.maxFetchWait);
> > > > > >> >
> > > > > >> > Consumer configuration:
> > > > > >> >
> > > > > >> > --max_fetch_bytes 2147483000--min_fetch_bytes
> > > > > >> > 2147483000--max_poll_records
> > 2147483000--max_partition_fetch_bytes
> > > > > >> > 2147483000--enable_auto_commit false--fetch_max_wait 900000
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to