Simon Dean created KAFKA-8689:
---------------------------------
Summary: Cannot Name Join State Store Topics
Key: KAFKA-8689
URL: https://issues.apache.org/jira/browse/KAFKA-8689
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.3.0
Reporter: Simon Dean
Performing a join on two KStreams, produces two state store topics. Currently
the names state store topics are auto generated and cannot be overridden.
Example code:
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class JoinTopicNamesExample {
public static void main(final String[] args) throws InterruptedException {
new Thread(() -> {
produce(args);
}).run();
new Thread(() -> {
try {
streams(args);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).run();
}
private static void produce(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
KafkaProducer<String, Long> producer = new KafkaProducer<>(props);
for (long i = 0; i < 10; i++) {
producer.send(new ProducerRecord("left", Long.toString(i), i));
}
for (long i = 0; i < 10; i++) {
producer.send(new ProducerRecord("right", Long.toString(i), i));
}
}
private static void streams(String[] args) throws InterruptedException {
final String bootstrapServers = args.length > 0 ? args[0] :
"localhost:9092";
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique
in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"join-topic-names-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
"user-region-lambda-example-client");
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
// Specify default (de)serializers for record keys and for record
values.
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the
default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 *
1000);
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Long> left = builder.stream("left",
Consumed.with(stringSerde, longSerde));
final KStream<String, Long> right = builder.stream("right",
Consumed.with(stringSerde, longSerde));
left.join(
right,
(value1, value2) -> value1 + value2,
JoinWindows.of(Duration.ofHours(1)),
Joined.as("sum"));
final KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration);
streams.start();
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka
Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
{code}
Here are the topics produce by the example code:
* join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
* join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
* left
* right
In the example code above, a material name is passed into the Join with
Joined.as("sum"). The "sum" name is ignored when the Kafka Stream decides on
the state store topic names.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)