[ https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bill Bejeck resolved KAFKA-8689. -------------------------------- Resolution: Duplicate Duplicate of https://issues.apache.org/jira/browse/KAFKA-8558 > Cannot Name Join State Store Topics > ----------------------------------- > > Key: KAFKA-8689 > URL: https://issues.apache.org/jira/browse/KAFKA-8689 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0 > Reporter: Simon Dean > Priority: Major > > Performing a join on two KStreams, produces two state store topics. > Currently the names state store topics are auto generated and cannot be > overridden. > Example code: > > {code:java} > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.LongSerializer; > import org.apache.kafka.common.serialization.Serde; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.KafkaStreams; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.JoinWindows; > import org.apache.kafka.streams.kstream.Joined; > import org.apache.kafka.streams.kstream.KStream; > import java.time.Duration; > import java.util.HashMap; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.TimeUnit; > public class JoinTopicNamesExample { > public static void main(final String[] args) throws InterruptedException { > new Thread(() -> { > produce(args); > }).run(); > new Thread(() -> { > try { > streams(args); > } catch (InterruptedException e) { > e.printStackTrace(); > } > }).run(); > } > private static void produce(String[] args) { > Map<String, Object> props = new HashMap<>(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(ProducerConfig.RETRIES_CONFIG, 0); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > LongSerializer.class); > KafkaProducer<String, Long> producer = new KafkaProducer<>(props); > for (long i = 0; i < 10; i++) { > producer.send(new ProducerRecord("left", Long.toString(i), i)); > } > for (long i = 0; i < 10; i++) { > producer.send(new ProducerRecord("right", Long.toString(i), i)); > } > } > private static void streams(String[] args) throws InterruptedException { > final String bootstrapServers = args.length > 0 ? args[0] : > "localhost:9092"; > final Properties streamsConfiguration = new Properties(); > // Give the Streams application a unique name. The name must be > unique in the Kafka cluster > // against which the application is run. > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > "join-topic-names-example"); > streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, > "user-region-lambda-example-client"); > // Where to find Kafka broker(s). > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > // Specify default (de)serializers for record keys and for record > values. > > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.Long().getClass().getName()); > // Records should be flushed every 10 seconds. This is less than the > default > // in order to keep this example interactive. > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 > * 1000); > final Serde<String> stringSerde = Serdes.String(); > final Serde<Long> longSerde = Serdes.Long(); > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<String, Long> left = builder.stream("left", > Consumed.with(stringSerde, longSerde)); > final KStream<String, Long> right = builder.stream("right", > Consumed.with(stringSerde, longSerde)); > left.join( > right, > (value1, value2) -> value1 + value2, > JoinWindows.of(Duration.ofHours(1)), > Joined.as("sum")); > final KafkaStreams streams = new KafkaStreams(builder.build(), > streamsConfiguration); > streams.start(); > Thread.sleep(TimeUnit.MINUTES.toMillis(1)); > // Add shutdown hook to respond to SIGTERM and gracefully close Kafka > Streams > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > } > } > {code} > > > Here are the topics produce by the example code: > * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog > * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog > * left > * right > In the example code above, a material name is passed into the Join with > Joined.as("sum"). The "sum" name is ignored when the Kafka Stream decides on > the state store topic names. -- This message was sent by Atlassian JIRA (v7.6.14#76016)