You don't think it's weird if I just batch in memory manually do you? I
wrote a small snippet:

  // Replicates Base Consumer
  while (true) {
    List<ConsumerRecord> records = consumer.poll();

    // Batch records in-memory until criteria is satisfied
    consumer.recordBuffer.addAll(records);
    if (isNull(consumer.start)) {
      consumer.start = Instant.now();
    }

    // Process records when either criteria is hit first
    if (minRecordsSatisfied.test(consumer.recordBuffer.size()) ||
        maxTimeoutExceeded.test(consumer.start)) {
      consumer.processRecords(consumer.recordBuffer);
    }
  }
}

I have a buffer up above and some criteria like minRecords and maxTimeout
that I just handle myself within the consumer directly. Do you think this
is a valid approach?

Thanks!
On Fri, Mar 20, 2020 at 4:11 PM Eric Azama <eazama...@gmail.com> wrote:

> Hi Ryan,
>
> If your end goal is just larger files on the server, you don't really need
> to mess with the batching configs. You could just write multiple polls
> worth of data to a single file.
>
>
> On Fri, Mar 20, 2020 at 3:50 PM Liam Clarke <liam.cla...@adscale.co.nz>
> wrote:
>
> > Hi Ryan,
> >
> > That'll be per poll.
> >
> > Kind regards,
> >
> > Liam Clarke
> >
> > On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, <
> coderyanschac...@gmail.com>
> > wrote:
> >
> > > I do see the default for message.max.bytes is set to 1MB though. That
> > would
> > > be for each record or each poll?
> > >
> > > On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte <
> > coderyanschac...@gmail.com>
> > > wrote:
> > >
> > > > Hi Liam,
> > > > We are running 2.3.1. I was hoping I wouldn't need to modify anything
> > at
> > > > the broker level since I do not have control/access to the broker
> > config,
> > > > just the consumer configuration. Am I out of luck in that case?
> > > >
> > > >
> > > >
> > > > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke <
> liam.cla...@adscale.co.nz
> > >
> > > > wrote:
> > > >
> > > >> Hi Ryan,
> > > >>
> > > >> Firstly, what version Kafka?
> > > >>
> > > >> Secondly check the broker's message.max.bytes and the topic's
> > > >> max.message.bytes, I suspect they're set a lot lower (or not at all)
> > and
> > > >> will override your fetch.min.bytes.
> > > >>
> > > >> Cheers,
> > > >>
> > > >> Liam Clarke
> > > >>
> > > >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, <
> > > coderyanschac...@gmail.com
> > > >> >
> > > >> wrote:
> > > >>
> > > >> > Hey guys.
> > > >> > I'm trying to maximize the amount of data I'm batching from Kafka.
> > The
> > > >> > output is me writing the data to a file on server. I'm adding
> > > extremely
> > > >> > high values to my consumer configuration and I'm still getting
> > > multiple
> > > >> > files written with very small file sizes.
> > > >> >
> > > >> > As seen below, I wait a long time to retrieve my min bytes. After
> > ~20
> > > >> > seconds the poll completes with N records and writes a pretty
> small
> > > >> file.
> > > >> > I'm interpreting that as the wait time not being respected nor is
> > the
> > > >> min
> > > >> > bytes. Why would this be the case?
> > > >> > Code:
> > > >> >
> > > >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > > >> args.enableAutoCommit);
> > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> > args.minFetchBytes);
> > > >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
> > args.maxFetchBytes);
> > > >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > > >> > args.maxPartitionFetchBytes);
> > > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> > > args.maxPollRecords);
> > > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> > args.maxFetchWait);
> > > >> >
> > > >> > Consumer configuration:
> > > >> >
> > > >> > --max_fetch_bytes 2147483000--min_fetch_bytes
> > > >> > 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
> > > >> > 2147483000--enable_auto_commit false--fetch_max_wait 900000
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to