[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573992#comment-16573992 ]
ASF GitHub Bot commented on KAFKA-6761: --------------------------------------- guozhangwang closed pull request #5453: MINOR: Follow up for KAFKA-6761 graph should add stores for consistency URL: https://github.com/apache/kafka/pull/5453 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index b502153bb52..788e0cb32a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -21,7 +21,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; +import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode; @@ -167,7 +169,7 @@ public String newStoreName(final String prefix) { } public synchronized void addStateStore(final StoreBuilder builder) { - internalTopologyBuilder.addStateStore(builder); + addGraphNode(root, new StateStoreNode(builder)); } public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, @@ -176,16 +178,15 @@ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeB final ConsumedInternal consumed, final String processorName, final ProcessorSupplier stateUpdateSupplier) { - // explicitly disable logging for global stores - storeBuilder.withLoggingDisabled(); - internalTopologyBuilder.addGlobalStore(storeBuilder, - sourceName, - consumed.timestampExtractor(), - consumed.keyDeserializer(), - consumed.valueDeserializer(), - topic, - processorName, - stateUpdateSupplier); + + final StreamsGraphNode globalStoreNode = new GlobalStoreNode(storeBuilder, + sourceName, + topic, + consumed, + processorName, + stateUpdateSupplier); + + addGraphNode(root, globalStoreNode); } public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java new file mode 100644 index 00000000000..a844de6f839 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.kstream.internals.ConsumedInternal; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; + +public class GlobalStoreNode extends StateStoreNode { + + + private final String sourceName; + private final String topic; + private final ConsumedInternal consumed; + private final String processorName; + private final ProcessorSupplier stateUpdateSupplier; + + + public GlobalStoreNode(final StoreBuilder<KeyValueStore> storeBuilder, + final String sourceName, + final String topic, + final ConsumedInternal consumed, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { + + super(storeBuilder); + this.sourceName = sourceName; + this.topic = topic; + this.consumed = consumed; + this.processorName = processorName; + this.stateUpdateSupplier = stateUpdateSupplier; + } + + + @Override + @SuppressWarnings("unchecked") + public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { + storeBuilder.withLoggingDisabled(); + topologyBuilder.addGlobalStore(storeBuilder, + sourceName, + consumed.timestampExtractor(), + consumed.keyDeserializer(), + consumed.valueDeserializer(), + topic, + processorName, + stateUpdateSupplier); + + } + + + @Override + public String toString() { + return "GlobalStoreNode{" + + "sourceName='" + sourceName + '\'' + + ", topic='" + topic + '\'' + + ", processorName='" + processorName + '\'' + + "} "; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java new file mode 100644 index 00000000000..a034106912f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.state.StoreBuilder; + +public class StateStoreNode extends StreamsGraphNode { + + protected final StoreBuilder storeBuilder; + + public StateStoreNode(final StoreBuilder storeBuilder) { + super(storeBuilder.toString(), false); + + this.storeBuilder = storeBuilder; + } + + @Override + public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { + + topologyBuilder.addStateStore(storeBuilder); + } + + @Override + public String toString() { + return "StateStoreNode{" + + " name='" + storeBuilder.name() + '\'' + + ", logConfig=" + storeBuilder.logConfig() + + ", loggingEnabled='" + storeBuilder.loggingEnabled() + '\'' + + "} "; + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > ------------------------------ > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)