Hi Pradeep The poll , pause and resume need to happen in the same thread -- in the same while loop.
If a scheduler is the trigger for pause or resume, do not call pause /resume from the scheduler thread. Instead set a variable in the class that has the poll loop. The poll loop can check the variable and pause/resume as necessary. For the rebalance scenario , you should implement the ConsumerRebalanceListener interface and register it with the consumer. It will get called when paritions are assigned or revoked. There you can call pause or resume again Hope this helps regards On Thu, Oct 25, 2018 at 6:11 PM pradeep s <sreekumar.prad...@gmail.com> wrote: > Hi Manoj/Matthias, > My requirement is that to run the consumer daily once , stream the messages > and pause when i am encountering a few empty fetches . > I am planning to run two consumers and pausing the consumption based on > the empty fetches for a topic with 4 partitions . > To avoid the consumer multi thread access issue , i am running consumer, > exit the poll loop, and calling pause on the same thread. In this case , i > will not continuously polling . > When the next schedule kicks in , i will resume the polling . > Will the consumer resume call cause issues ,since the schedule loop is > trigger long time after the polling stopped .(Or the old approach of > continuous polling is the correct one) > Also ,Manoj, can you please explain on the rebalance scenario if the > consumer is paused for two partitions and gets the assignment for another > two partitions (because of a pod termination), how can i pause the > consumption if its not the scheduled time to process the records. > Thanks > Pradeep > > On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <khangaon...@gmail.com> > wrote: > > > One item to be aware with pause and resume - is that it applies to > > partitions currently assigned to the consumer. > > > > But partitions can get revoked or additional partitions can get assigned > to > > consumer. > > > > With reassigned , you might be expecting the consumer to be paused but > > suddenly start getting messages because a new partition got assigned. > > > > Use the RebalanceListener to pause or resume any new partitions > > > > regards > > > > On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > That is correct: clients are not thread safe. > > > > > > You can use an `AtomicBoolean needToResume` that you share over both > > > threads and that is initially false. > > > > > > In your scheduled method, you set the variable to true. > > > > > > In your main consumer, each time before you call poll(), you check if > > > the variable is set to true. If yes, you resume() and reset the > variable > > > to false. > > > > > > Hope this helps. > > > > > > -Matthias > > > > > > > > > On 10/25/18 2:09 PM, pradeep s wrote: > > > > Thanks Matthias. I am facing the issue when i am trying to call the > > > resume > > > > from the scheduled method . > > > > Was getting exception that Kafka Consumer is not safe for multi > > threaded > > > > access . I am trying to see how can call pause and resume on the same > > > > thread. There will be only one thread running for consumption. > > > > > > > > > > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > >> There is no issue if you call `poll()` is all partitions are paused. > > If > > > >> fact, if you want to make sure that the consumer does not fall out > of > > > >> the consumer group, you must call `poll()` in regular interval to > not > > > >> hit `max.poll.interval.ms` timeout. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 10/24/18 10:25 AM, pradeep s wrote: > > > >>> Pause and resume is required since i am running a pod in kubernetes > > > and i > > > >>> am not shutting down the app > > > >>> > > > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s < > > > sreekumar.prad...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Hi, > > > >>>> I have a requirement to have kafka streaming start at scheduled > time > > > and > > > >>>> then pause the stream when the consumer poll returns empty fetches > > for > > > >> 3 or > > > >>>> more polls. > > > >>>> > > > >>>> I am starting a consumer poll loop during application startup > using > > a > > > >>>> singled thread executor and then pausing the consumer when the > poll > > is > > > >>>> returning empty for 3 polls. > > > >>>> > > > >>>> When the schedule kicks in , i am calling *consumer.resume.* > > > >>>> > > > >>>> Is this approach correct ? > > > >>>> Will it cause any issue If the consumer calls poll on a paused > > > >> consumer. > > > >>>> > > > >>>> Skeleton Code > > > >>>> ============ > > > >>>> > > > >>>> public class *OfferItemImageConsumer* implements Runnable { > > > >>>> > > > >>>> @Override > > > >>>> public void run() { > > > >>>> try { > > > >>>> do { > > > >>>> ConsumerRecords<String, String> records = > > > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > > > >>>> writeAndPauseEmptyFetch(records); > > > >>>> processRecords(records); > > > >>>> } while (!consumerLoopClosed.get()); > > > >>>> } catch (RuntimeException ex) { > > > >>>> handleConsumerLoopException(ex); > > > >>>> } finally { > > > >>>> kafkaConsumer.close(); > > > >>>> } > > > >>>> } > > > >>>> > > > >>>> > > > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, > String> > > > >> records) { > > > >>>> if (records.isEmpty()) { > > > >>>> emptyFetchCount++; > > > >>>> } > > > >>>> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && > !consumerPaused) > > { > > > >>>> writeImageData(); > > > >>>> emptyFetchCount = 0; > > > >>>> kafkaConsumer.pause(kafkaConsumer.assignment()); > > > >>>> consumerPaused = true; > > > >>>> } > > > >>>> } > > > >>>> > > > >>>> } > > > >>>> > > > >>>> ================================= > > > >>>> > > > >>>> public class *ItemImageStreamScheduler* { > > > >>>> private static final int TERMINATION_TIMEOUT = 10; > > > >>>> > > > >>>> > > > >>>> private ExecutorService executorService = > > > >> Executors.newSingleThreadExecutor(); > > > >>>> > > > >>>> private final OfferItemImageConsumer offerItemImageConsumer; > > > >>>> private final ItemImageStreamConfig itemImageStreamConfig; > > > >>>> private final KafkaConsumer<String, String> kafkaConsumer; > > > >>>> > > > >>>> @EventListener(ApplicationReadyEvent.class) > > > >>>> void startStreaming() { > > > >>>> executorService.submit(offerItemImageConsumer); > > > >>>> } > > > >>>> @Scheduled > > > >>>> void resumeStreaming() { > > > >>>> kafkaConsumer.resume(kafkaConsumer.assignment()); > > > >>>> } > > > >>>> > > > >>>> > > > >>>> } > > > >>>> > > > >>>> Thanks > > > >>>> > > > >>>> Pradeep > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > > > -- > > http://khangaonkar.blogspot.com/ > > > -- http://khangaonkar.blogspot.com/