[ https://issues.apache.org/jira/browse/KAFKA-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504743#comment-16504743 ]
John Roesler commented on KAFKA-7004: ------------------------------------- Hi Shawn, That sounds good to me. What you say seems to be in line with something [~mjsax] proposed while reviewing the same PR. I think we generally agreed with him, but it was out of scope for my PR, since it would require a KIP. You might have seen it: [https://github.com/apache/kafka/pull/5107/files/83d5029214b506c6c1e3940784b5ee1329731bd6#diff-e870eab2dedeee93442ba87dc12609a4R81] . So, yes, it sounds like the path is clear for you to create a KIP and PR! Thanks, -John > Support configurable restore consumer poll timeout > -------------------------------------------------- > > Key: KAFKA-7004 > URL: https://issues.apache.org/jira/browse/KAFKA-7004 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.1.0 > Reporter: Shawn Nguyen > Priority: Minor > Labels: Needs-kip > > In the StateChangelogReader, the restore consumer is currently hard coded to > poll for 10ms at most per call. > {code:java} > public Collection<TopicPartition> restore(final RestoringTasks active) { > if (!needsInitializing.isEmpty()) { > initialize(); > } > if (needsRestoring.isEmpty()) { > restoreConsumer.unsubscribe(); > return completed(); > } > final Set<TopicPartition> restoringPartitions = new > HashSet<>(needsRestoring.keySet()); > try { > final ConsumerRecords<byte[], byte[]> allRecords = > restoreConsumer.poll(10); > for (final TopicPartition partition : restoringPartitions) { > restorePartition(allRecords, partition, > active.restoringTaskFor(partition)); > } > ...{code} > > It'd be nice to be able to configure the restore consumer to poll for a > larger timeout (e.g. 500ms) to give it more time to accumulate records for > the restoration task. In the main event loop for polling in > StreamThread.java, the main consumer uses the POLL_MS_CONFIG set in > StreamsConfig.java to configure the max poll timeout. We could construct a > similar config in the StreamsConfig class, but prefixed with the processing > type (restore in this case). Let me know if this sounds reasonable, and I'll > create a KIP and PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)