Hi,
I have a requirement to have kafka streaming start at scheduled time and
then pause the stream when the consumer poll returns empty fetches for 3 or
more polls.

I am starting a consumer poll loop during application startup using a
singled thread executor and then pausing the consumer when the poll is
returning empty for 3 polls.

When the schedule kicks in , i am calling *consumer.resume.*

Is this approach correct ?
Will it cause any issue If the  consumer calls poll on a paused consumer.

Skeleton Code
============

public class *OfferItemImageConsumer* implements Runnable {

@Override
public void run() {
    try {
        do {
            ConsumerRecords<String, String> records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
            writeAndPauseEmptyFetch(records);
            processRecords(records);
        } while (!consumerLoopClosed.get());
    } catch (RuntimeException ex) {
        handleConsumerLoopException(ex);
    } finally {
        kafkaConsumer.close();
    }
}


private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) {
    if (records.isEmpty()) {
        emptyFetchCount++;
    }
    if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
        writeImageData();
        emptyFetchCount = 0;
        kafkaConsumer.pause(kafkaConsumer.assignment());
        consumerPaused = true;
    }
}

}

=================================

public class *ItemImageStreamScheduler* {
    private static final int TERMINATION_TIMEOUT = 10;


    private ExecutorService executorService =
Executors.newSingleThreadExecutor();

    private final OfferItemImageConsumer offerItemImageConsumer;
    private final ItemImageStreamConfig itemImageStreamConfig;
    private final KafkaConsumer<String, String> kafkaConsumer;

    @EventListener(ApplicationReadyEvent.class)
    void startStreaming() {
        executorService.submit(offerItemImageConsumer);
    }
    @Scheduled
    void resumeStreaming() {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }


}

Thanks

Pradeep

Reply via email to