[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15716971#comment-15716971 ]
Guozhang Wang commented on KAFKA-4322: -------------------------------------- I think I agree that some app-specific logic during restoration cannot be simply wrapped in library logging or metrics, and as one of the "original developer" I'm happy to take the blame :) One way I can think of to remedy the backward incompatibility issue is to add a new interface class like the following {code} public interface AdvancedStateRestoreCallback extends StateRestoreCallback { void beginRestore(StateRestoreCallbackContext context) void endRestore(StateRestoreCallbackContext context); } {code} And then in {{ProcessorStateManager}} check if the restore callback is of {{AdvancedStateRestoreCallback}} or not dynamically and trigger the functions accordingly (by the way I'm not married to the interface name, just for demos). What do you think [~markshelton]? Would you like to file a KIP along with updating the PR? > StateRestoreCallback begin and end indication > --------------------------------------------- > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.1.0 > Reporter: Mark Shelton > Assignee: Mark Shelton > Priority: Minor > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)