k-apol commented on code in PR #19913: URL: https://github.com/apache/kafka/pull/19913#discussion_r2163585695
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) { private final Object stateLock = new Object(); protected volatile State state = State.CREATED; + /** + * Initializes broker-side state. + * + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + */ + public void init() { + this.init(DEFAULT_INIT_TIMEOUT_MS); + } + + /** + * Initializes broker-side state. + * + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the given timeout + */ + + public void init(final Duration timeout) { + final InitParameters initParameters = InitParameters.initParameters(); + initParameters.setTimeout(timeout); + + this.doInit(InitParameters.initParameters()); + } + + /** + * Initializes broker-side state. + * + * This methods takes parameters that specify which internal topics to setup if some + * but not all of them are absent. + * + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * and the given initialization parameters do not specify to setup them + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + */ + + public void init(final InitParameters initParameters) { + this.doInit(initParameters); + } + + /** + * Initializes broker-side state. + * + * This methods takes parameters that specify which internal topics to setup if some + * but not all of them are absent. + * + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * and the given initialization parameters do not specify to setup them + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the given timeout + */ + public void init(final InitParameters initParameters, final Duration timeout) { + initParameters.enableTimeout(); + initParameters.setTimeout(timeout); + + this.doInit(initParameters); + } + + private void doInit(final InitParameters initParameters) { + final InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs); + if (initParameters.hasTimeoutEnabled()) { + internalTopicManager.setInitTimeout(initParameters.getTimeout()); + } + + final Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>(); + final Set<String> allSourceTopics = new HashSet<>(); + for (final Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) { + allInternalTopics.putAll(topicsInfo.stateChangelogTopics); + allInternalTopics.putAll(topicsInfo.repartitionSourceTopics); + allSourceTopics.addAll(topicsInfo.sourceTopics); + } + } + try { Review Comment: Thank you, made an update. Resolving this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org