Simon Dean created KAFKA-8689: --------------------------------- Summary: Cannot Name Join State Store Topics Key: KAFKA-8689 URL: https://issues.apache.org/jira/browse/KAFKA-8689 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: Simon Dean
Performing a join on two KStreams, produces two state store topics. Currently the names state store topics are auto generated and cannot be overridden. Example code: {code:java} import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; public class JoinTopicNamesExample { public static void main(final String[] args) throws InterruptedException { new Thread(() -> { produce(args); }).run(); new Thread(() -> { try { streams(args); } catch (InterruptedException e) { e.printStackTrace(); } }).run(); } private static void produce(String[] args) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); KafkaProducer<String, Long> producer = new KafkaProducer<>(props); for (long i = 0; i < 10; i++) { producer.send(new ProducerRecord("left", Long.toString(i), i)); } for (long i = 0; i < 10; i++) { producer.send(new ProducerRecord("right", Long.toString(i), i)); } } private static void streams(String[] args) throws InterruptedException { final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092"; final Properties streamsConfiguration = new Properties(); // Give the Streams application a unique name. The name must be unique in the Kafka cluster // against which the application is run. streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-topic-names-example"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "user-region-lambda-example-client"); // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); // Records should be flushed every 10 seconds. This is less than the default // in order to keep this example interactive. streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, Long> left = builder.stream("left", Consumed.with(stringSerde, longSerde)); final KStream<String, Long> right = builder.stream("right", Consumed.with(stringSerde, longSerde)); left.join( right, (value1, value2) -> value1 + value2, JoinWindows.of(Duration.ofHours(1)), Joined.as("sum")); final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); streams.start(); Thread.sleep(TimeUnit.MINUTES.toMillis(1)); // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } } {code} Here are the topics produce by the example code: * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog * left * right In the example code above, a material name is passed into the Join with Joined.as("sum"). The "sum" name is ignored when the Kafka Stream decides on the state store topic names. -- This message was sent by Atlassian JIRA (v7.6.14#76016)