mjsax commented on code in PR #18778: URL: https://github.com/apache/kafka/pull/18778#discussion_r1938398744
########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -918,38 +630,96 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder, } /** - * Adds a read-only {@link StateStore} to the topology. - * <p> - * A read-only {@link StateStore} does not create a dedicated changelog topic but uses it's input topic as - * changelog; thus, the used topic should be configured with log compaction. - * <p> - * The <code>auto.offset.reset</code> property will be set to <code>earliest</code> for this topic. - * <p> - * The provided {@link ProcessorSupplier} will be used to create a processor for all messages received - * from the given topic. This processor should contain logic to keep the {@link StateStore} up-to-date. - * - * @param storeBuilder user defined store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered - */ - public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder, - final String sourceName, - final TimestampExtractor timestampExtractor, - final Deserializer<KIn> keyDeserializer, - final Deserializer<VIn> valueDeserializer, - final String topic, - final String processorName, - final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) { - storeBuilder.withLoggingDisabled(); + * Adds a read-only {@link StateStore state store} to the topology. + * The state store will be populated with data from the named source topic. + * State stores are sharded and the number of shards is determined at runtime by the number of input topic + * partitions for the source topic <em>and</em> the connected processors (if any). + * Read-only state stores can be accessed from "outside" using "Interactive Queries" (cf., + * {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}). + * + * <p>The {@code auto.offset.reset} property will be set to {@code "earliest"} for the source topic. + * If you want to specify a source specific {@link TimestampExtractor} you can use + * {@link #addReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)}. + * + * <p>{@link #connectProcessorAndStateStores(String, String...) Connecting} a read-only state store to + * {@link #addProcessor(String, ProcessorSupplier, String...) processors} is optional. + * If not connected to any processor, the state store will still be created and can be queried via + * {@link KafkaStreams#store(StoreQueryParameters)} or {@link KafkaStreams#query(StateQueryRequest)}. + * If the state store is connected to another processor, each corresponding {@link Processor} instance in the + * topology has <em>read-only</em> access to a single shard of the state store. + * If you need write access to a state store, you can use a + * {@link #addStateStore(StoreBuilder, String...) "regular" state store} instead. + * If you need access to all data in a state store inside a {@link Processor}, you can use a (read-only) + * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) + * global state store}. + * + * <p>The provided {@link ProcessorSupplier} will be used to create {@link Processor} instances which will be used + * to process the records from the source topic. + * These {@link Processor processors} are the only ones with <em>write</em> access to the state store, + * and should contain logic to keep the {@link StateStore} up-to-date. + * + * <p>Read-only state stores are always enabled for fault-tolerance and recovery. + * In contrast to {@link #addStateStore(StoreBuilder, String...) "regular" state stores} no dedicated changelog + * topic will be created in Kafka though, but the source topic is used for recovery. + * Thus, the source topic should be configured with log compaction. + * + * @param storeBuilder + * the {@link StoreBuilder} used to obtain {@link StateStore state store} instances (one per shard) + * @param sourceName + * the unique name of the internally added {@link #addSource(String, String...) source} + * @param keyDeserializer + * the {@link Deserializer} for record keys + * (can be {@code null} to use the default key deserializer from {@link StreamsConfig}) + * @param valueDeserializer + * the {@link Deserializer} for record values + * (can be {@code null} to use the default value deserializer from {@link StreamsConfig}) + * @param topic + * the source topic to read the data from + * @param processorName + * the unique name of the internally added + * {@link #addProcessor(String, ProcessorSupplier, String...) processor} which maintains the state store + * @param stateUpdateSupplier + * the supplier used to obtain {@link Processor} instances, which maintain the state store + * + * @return itself + * + * @throws TopologyException + * if the {@link StoreBuilder#name() state store} was already added, or + * if the source or processor names are not unique, or + * if the source topic has already been registered by another + * {@link #addSink(String, String, String...) source}, read-only state store, or + * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store} + */ + public synchronized <K, V> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder, + final String sourceName, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) { + return addReadOnlyStateStore( + storeBuilder, + sourceName, + null, + keyDeserializer, + valueDeserializer, + topic, + processorName, + stateUpdateSupplier + ); + } + /** + * See {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)}. + */ + public synchronized <K, V> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, Review Comment: Keeping generic types here does make sense, as it allows us to enforce the same type for the key and value on the `Deserializer` and the `ProcessorSupplier` (there is a few other places like this). Given this one, and the `addProcessorCase`, and wondering if we should just add generic everywhere avoiding `<?>` just for code style consistency, even if it does not buy us anything for type safetly. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org