[ https://issues.apache.org/jira/browse/KAFKA-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-7004. ------------------------------------ Resolution: Fixed > Support configurable restore consumer poll timeout > -------------------------------------------------- > > Key: KAFKA-7004 > URL: https://issues.apache.org/jira/browse/KAFKA-7004 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.1.0 > Reporter: Shawn Nguyen > Priority: Minor > Labels: Needs-kip > > In the StateChangelogReader, the restore consumer is currently hard coded to > poll for 10ms at most per call. > {code:java} > public Collection<TopicPartition> restore(final RestoringTasks active) { > if (!needsInitializing.isEmpty()) { > initialize(); > } > if (needsRestoring.isEmpty()) { > restoreConsumer.unsubscribe(); > return completed(); > } > final Set<TopicPartition> restoringPartitions = new > HashSet<>(needsRestoring.keySet()); > try { > final ConsumerRecords<byte[], byte[]> allRecords = > restoreConsumer.poll(10); > for (final TopicPartition partition : restoringPartitions) { > restorePartition(allRecords, partition, > active.restoringTaskFor(partition)); > } > ...{code} > > It'd be nice to be able to configure the restore consumer to poll for a > larger timeout (e.g. 500ms) to give it more time to accumulate records for > the restoration task. In the main event loop for polling in > StreamThread.java, the main consumer uses the POLL_MS_CONFIG set in > StreamsConfig.java to configure the max poll timeout. We could construct a > similar config in the StreamsConfig class, but prefixed with the processing > type (restore in this case). Let me know if this sounds reasonable, and I'll > create a KIP and PR. -- This message was sent by Atlassian Jira (v8.3.4#803005)