Hi, I have a simple streaming app that copies data from one topic to another, so when I feed 1000 records into source topic I receive 1000 records in the target topic. Also the app contains a transform() step which does nothing, except instantiating a KafkaConsumer (with a UUID as group.id) that reads some historical data from input topic. As soon as this consumer is in place, the streaming app does not work anymore, records get lost (i.e. I feed 1000 records into source topic and receive around 200 on the target topic) and it's terribly slow - looks like a lot of rebalancing happens.
To me this looks like a bug, because the KStreamBuilder uses the application id as group.id ("kafka-smurfing" in this case), and the transformer uses a different one (uuid). Full source code: public class Main { public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092"; public static final String SOURCE_TOPIC = "transactions"; public static final String TARGET_TOPIC = "alerts"; public static void main(String[] args) throws Exception { KStreamBuilder builder = new KStreamBuilder(); builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC) .transform(() -> new DebugTransformer()) .to(Serdes.String(), Serdes.String(), TARGET_TOPIC); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } } public class DebugTransformer implements Transformer<String, String, KeyValue<String, String>> { private KafkaConsumer<String, String> consumer; private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumer = new KafkaConsumer<>(props); } @Override public KeyValue<String, String> transform(String key, String value) { TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, context.partition()); consumer.assign(Arrays.asList(partition)); consumer.seek(partition, 0); consumer.poll(100); return KeyValue.pair(key, value); } @Override public void close() { consumer.close(); } @Override public KeyValue<String, String> punctuate(long timestamp) { return null; } } Thanks for any hints, Andreas This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.