Simon Dean created KAFKA-8689:
---------------------------------

             Summary: Cannot Name Join State Store Topics
                 Key: KAFKA-8689
                 URL: https://issues.apache.org/jira/browse/KAFKA-8689
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Simon Dean


Performing a join on two KStreams, produces two state store topics.  Currently 
the names state store topics are auto generated and cannot be overridden. 

Example code:

 
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class JoinTopicNamesExample {

    public static void main(final String[] args) throws InterruptedException {
        new Thread(() -> {
            produce(args);
        }).run();

        new Thread(() -> {
            try {
                streams(args);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).run();
    }

    private static void produce(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);

        KafkaProducer<String, Long> producer = new KafkaProducer<>(props);

        for (long i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("left", Long.toString(i), i));
        }

        for (long i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("right", Long.toString(i), i));
        }
    }

    private static void streams(String[] args) throws InterruptedException {
        final String bootstrapServers = args.length > 0 ? args[0] : 
"localhost:9092";
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique 
in the Kafka cluster
        // against which the application is run.
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"join-topic-names-example");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 
"user-region-lambda-example-client");
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
        // Specify default (de)serializers for record keys and for record 
values.
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
        // Records should be flushed every 10 seconds. This is less than the 
default
        // in order to keep this example interactive.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 
1000);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, Long> left = builder.stream("left", 
Consumed.with(stringSerde, longSerde));
        final KStream<String, Long> right = builder.stream("right", 
Consumed.with(stringSerde, longSerde));

        left.join(
                right,
                (value1, value2) -> value1 + value2,
                JoinWindows.of(Duration.ofHours(1)), 
                Joined.as("sum"));

        final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration);

        streams.start();

        Thread.sleep(TimeUnit.MINUTES.toMillis(1));

        // Add shutdown hook to respond to SIGTERM and gracefully close Kafka 
Streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}
{code}
 

 

Here are the topics produce by the example code:
 * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
 * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
 * left
 * right

In the example code above, a material name is passed into the Join with 
Joined.as("sum").  The "sum" name is ignored when the Kafka Stream decides on 
the state store topic names. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to