Hi, I have a simple streaming app that copies data from one topic to another,
so when I feed 1000 records into source topic I receive 1000 records in the
target topic. Also the app contains a transform() step which does nothing,
except instantiating a KafkaConsumer (with a UUID as group.id) that reads some
historical data from input topic. As soon as this consumer is in place, the
streaming app does not work anymore, records get lost (i.e. I feed 1000 records
into source topic and receive around 200 on the target topic) and it's terribly
slow - looks like a lot of rebalancing happens.
To me this looks like a bug, because the KStreamBuilder uses the application id
as group.id ("kafka-smurfing" in this case), and the transformer uses a
different one (uuid).
Full source code:
public class Main {
public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
public static final String SOURCE_TOPIC = "transactions";
public static final String TARGET_TOPIC = "alerts";
public static void main(String[] args) throws Exception {
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
.transform(() -> new DebugTransformer())
.to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
public class DebugTransformer implements Transformer<String, String,
KeyValue<String, String>> {
private KafkaConsumer<String, String> consumer;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
}
@Override
public KeyValue<String, String> transform(String key, String value) {
TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC,
context.partition());
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, 0);
consumer.poll(100);
return KeyValue.pair(key, value);
}
@Override
public void close() {
consumer.close();
}
@Override
public KeyValue<String, String> punctuate(long timestamp) {
return null;
}
}
Thanks for any hints,
Andreas
This email and any files transmitted with it are confidential, proprietary and
intended solely for the individual or entity to whom they are addressed. If you
have received this email in error please delete it immediately.