[ 
https://issues.apache.org/jira/browse/KAFKA-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503912#comment-16503912
 ] 

John Roesler commented on KAFKA-7004:
-------------------------------------

Hi Shawn,

Thanks for the report.

Coincidentally, I think this would be fixed by a PR that I already have in 
progress: 
[https://github.com/apache/kafka/pull/5107/files#diff-46ed6d177221c8778965ecb1b6657be3R88]

Do you agree?

Thanks,

-John

> Support configurable restore consumer poll timeout
> --------------------------------------------------
>
>                 Key: KAFKA-7004
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7004
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Shawn Nguyen
>            Priority: Minor
>              Labels: Needs-kip
>
> In the StateChangelogReader, the restore consumer is currently hard coded to 
> poll for 10ms at most per call.
> {code:java}
>  public Collection<TopicPartition> restore(final RestoringTasks active) {
>     if (!needsInitializing.isEmpty()) {
>         initialize();
>     }
>     if (needsRestoring.isEmpty()) {
>         restoreConsumer.unsubscribe();
>         return completed();
>     }
>     final Set<TopicPartition> restoringPartitions = new 
> HashSet<>(needsRestoring.keySet());
>     try {
>         final ConsumerRecords<byte[], byte[]> allRecords = 
> restoreConsumer.poll(10);
>         for (final TopicPartition partition : restoringPartitions) {
>             restorePartition(allRecords, partition, 
> active.restoringTaskFor(partition));
>         }
>     ...{code}
>  
> It'd be nice to be able to configure the restore consumer to poll for a 
> larger timeout (e.g. 500ms) to give it more time to accumulate records for 
> the restoration task. In the main event loop for polling in 
> StreamThread.java, the main consumer uses the POLL_MS_CONFIG set in 
> StreamsConfig.java to configure the max poll timeout. We could construct a 
> similar config in the StreamsConfig class, but prefixed with the consumer 
> type (restore in this case). Let me know if this sounds reasonable, and I'll 
> create a KIP and PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to