Hi, I have a requirement to have kafka streaming start at scheduled time and then pause the stream when the consumer poll returns empty fetches for 3 or more polls.
I am starting a consumer poll loop during application startup using a singled thread executor and then pausing the consumer when the poll is returning empty for 3 polls. When the schedule kicks in , i am calling *consumer.resume.* Is this approach correct ? Will it cause any issue If the consumer calls poll on a paused consumer. Skeleton Code ============ public class *OfferItemImageConsumer* implements Runnable { @Override public void run() { try { do { ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); writeAndPauseEmptyFetch(records); processRecords(records); } while (!consumerLoopClosed.get()); } catch (RuntimeException ex) { handleConsumerLoopException(ex); } finally { kafkaConsumer.close(); } } private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) { if (records.isEmpty()) { emptyFetchCount++; } if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { writeImageData(); emptyFetchCount = 0; kafkaConsumer.pause(kafkaConsumer.assignment()); consumerPaused = true; } } } ================================= public class *ItemImageStreamScheduler* { private static final int TERMINATION_TIMEOUT = 10; private ExecutorService executorService = Executors.newSingleThreadExecutor(); private final OfferItemImageConsumer offerItemImageConsumer; private final ItemImageStreamConfig itemImageStreamConfig; private final KafkaConsumer<String, String> kafkaConsumer; @EventListener(ApplicationReadyEvent.class) void startStreaming() { executorService.submit(offerItemImageConsumer); } @Scheduled void resumeStreaming() { kafkaConsumer.resume(kafkaConsumer.assignment()); } } Thanks Pradeep