guozhangwang commented on code in PR #12337:
URL: https://github.com/apache/kafka/pull/12337#discussion_r912542741
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -697,7 +693,7 @@ private Map<TopicPartition, Long>
committedOffsetForChangelogs(final Map<TaskId,
final
Set<TopicPartition> partitions) {
final Map<TopicPartition, Long> committedOffsets;
try {
- committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
Review Comment:
Hi @cadonna I spent some time trying out the idea, but it did not work very
elegantly. The main reason is that we have both `committedOffsetForChangelogs`
and `endOffsetForChangelogs` which may throw `ExecutionException` /
`TimeoutException` and `InterruptedException`. And if we want to let the
`InterruptedException` to rethrow, like you said we need to let the caller to
handle that, but the current logic is that we would swallow and record the task
idleness, so to be compatible we'd still need to do so, which means that we
would end up doing similar things for the stream thread.
So I suggest we do what you suggested after we completed moving the store
changelog reader to the restore thread, and then change the semantics as
"record task idleness for retriable execution and timeout exception, but
re-throw for interrupted exception". WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]