[ 
https://issues.apache.org/jira/browse/KAFKA-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504011#comment-16504011
 ] 

Shawn Nguyen commented on KAFKA-7004:
-------------------------------------

Thanks [~vvcephei]. I took a pass at your PR and it does seem to make things a 
lot better since now we're using the global poll timeout passed in the streams 
config instead of a hardcoded 10ms for the restore part. However, I think long 
term we still want to separate out the timeouts for the processing type instead 
(whether it's main or restore processing) and make each processing poll timeout 
its own configuration.

> Support configurable restore consumer poll timeout
> --------------------------------------------------
>
>                 Key: KAFKA-7004
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7004
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Shawn Nguyen
>            Priority: Minor
>              Labels: Needs-kip
>
> In the StateChangelogReader, the restore consumer is currently hard coded to 
> poll for 10ms at most per call.
> {code:java}
>  public Collection<TopicPartition> restore(final RestoringTasks active) {
>     if (!needsInitializing.isEmpty()) {
>         initialize();
>     }
>     if (needsRestoring.isEmpty()) {
>         restoreConsumer.unsubscribe();
>         return completed();
>     }
>     final Set<TopicPartition> restoringPartitions = new 
> HashSet<>(needsRestoring.keySet());
>     try {
>         final ConsumerRecords<byte[], byte[]> allRecords = 
> restoreConsumer.poll(10);
>         for (final TopicPartition partition : restoringPartitions) {
>             restorePartition(allRecords, partition, 
> active.restoringTaskFor(partition));
>         }
>     ...{code}
>  
> It'd be nice to be able to configure the restore consumer to poll for a 
> larger timeout (e.g. 500ms) to give it more time to accumulate records for 
> the restoration task. In the main event loop for polling in 
> StreamThread.java, the main consumer uses the POLL_MS_CONFIG set in 
> StreamsConfig.java to configure the max poll timeout. We could construct a 
> similar config in the StreamsConfig class, but prefixed with the processing 
> type (restore in this case). Let me know if this sounds reasonable, and I'll 
> create a KIP and PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to