I want to use the new 0.9 consumer for a particular application.

My use case is the following:

i) The TopicPartition I need to poll has a short log say 10 mins odd
(log.retention.minutes is set to 10).

ii) I don't use a consumer group i.e. I manage the partition assignment
myself.

iii) Whenever I get a new partition assigned to one of my processes
(through my own mechanism), I want to query for the current end of the log
and then seek to the beginning of the log. I want to continue reading in a
straight line till my offset moves from the beginning to the end that I
queried before beginning to poll. When I am done reading this much data (I
know the end has moved by the time I've read all of it) I consider myself
caught up. Note: I only need to do the seek to the beginning of the log,
which the new consumer allows one to do. I just need to know the end of log
offset so that I know that I have "caught up".

So questions I have are:

i) How do I get the last log offset from the Kafka consumer? The
SimpleConsumer had a way to get this information. If I can get this info
from the consumer, I plan to do something like this:


    private boolean assignNewPartitionAndCatchUp(int newPartition) {

        final TopicPartition newTopicPartition = new TopicPartition(myTopic,
newPartition);

       // Queries the existing partitions and adds this TopicPartition to
the list.

        List<TopicPartition> newAssignment =
createNewAssignmentByAddingPartition(

            newTopicPartition);

        consumer.assign(newAssignment);


        // How do I actually do this with the consumer?

        final long lastMessageOffset = getLastMessageOffset(
newTopicPartition);


        consumer.seekToBeginning(newTopicPartition);

        final long timeout = 100;

        int numIterations = 0;

       final boolean caughtUp = false;

        while (!caughtUp && numIterations < maxIterations) {

            ConsumerRecords<Void, byte[]> records = consumer.poll(timeout);

            numIterations += 1;

            for (ConsumerRecord<Void, byte[]> record : records) {

               //  All messages are processed regularly even if they belong
to other partitions.

                processRecord(record.value());

                final int partition = record.partition();

                final long offset = record.offset();

                // Only if we find that the new partition has caught up do
we return.

                if (partition == newPartition && offset >= lastMessageOffset)
{

                    caughtUp = true;

                }

            }

        }

        return caughtUp;

    }

ii) If I ask the consumer to seek to the beginning via the  consumer
.seekToBeginning(newTopicPartition) call, will it handle the case where the
log has rolled over in the meanwhile and what was considered the beginning
offset is no longer present.  Given my log retention is only 10 minutes and
the partitions will each get quite a bit of traffic, I'd imagine that
messages will fall out of the log quite often.

iii) What settings do I need on the Kafka broker (besides
log.retention.minutes = 10) to ensure that my partitions don't retain any
more than 10 minutes of data (plus a couple minutes is fine). Do I need to
tune how often Kafka checks for log deletion eligibility? Any other
settings I should play with to ensure timely deletion?

Thanks,
Rajiv

Reply via email to