Hi Matthias,

thank you for your response. Here are some more details:

- my input and output topics have 6 partitions each.
- my application instances run from docker images so the is no state left over 
from a previous run
- I have a single docker container running kafka + zookeeper (spotify/kafka).
- when the second consumer is in place, I receive a random number of records in 
the target topic (e.g. I send 1000 records and receive 439 on first run and 543 
on second run)
- the problem only occurs if two instances of the application are running. If I 
only start one instance then it's slow but when I send 1000 records I also 
receive 1000 records. I think this is also an indicator for a bug, because a 
streaming app should behave the same, independent of whether one or two 
instances are running.
- I added the properties you suggested, but behavior did not change.

I think this is a bug, consumers of different groups should not interact with 
each other. Should I submit a bug report and if so, any suggestions on how to 
do that?


hard to diagnose. The new consumer should not affect the Streams app though -- 
even if I am wondering why you need it.

> KafkaConsumer (with a UUID as group.id) that reads some historical
> data from input topic

Maybe using GlobalKTable instead might be a better solution?

> (i.e. I feed 1000 records into source topic and receive around 200 on
> the target topic)

Are this the first 200 records? Or are those 200 record "random ones"
from your input topic? How many partitions do you have for input/output topic?

> looks like a lot of rebalancing happens.

We recommend to change StreamsConfig values as follows to improve in rebalance 

> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> from default of 0
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> from default of 300 s

We will change the default values accordingly in future release but for now you 
should set this manually.

Hope this helps.


On 4/24/17 10:01 AM, Andreas Voss wrote:
> Hi, I have a simple streaming app that copies data from one topic to another, 
> so when I feed 1000 records into source topic I receive 1000 records in the 
> target topic. Also the app contains a transform() step which does nothing, 
> except instantiating a KafkaConsumer (with a UUID as group.id) that reads 
> some historical data from input topic. As soon as this consumer is in place, 
> the streaming app does not work anymore, records get lost (i.e. I feed 1000 
> records into source topic and receive around 200 on the target topic) and 
> it's terribly slow - looks like a lot of rebalancing happens.
> To me this looks like a bug, because the KStreamBuilder uses the application 
> id as group.id ("kafka-smurfing" in this case), and the transformer uses a 
> different one (uuid).
> Full source code:
> public class Main {
>   public static final String BOOTSTRAP_SERVERS = "";
>   public static final String SOURCE_TOPIC = "transactions";
>   public static final String TARGET_TOPIC =  "alerts";
>   public static void main(String[] args) throws Exception {
>     KStreamBuilder builder = new KStreamBuilder();
>     builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
>            .transform(() -> new DebugTransformer())
>            .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
>     Properties props = new Properties();
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
>     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>     KafkaStreams streams = new KafkaStreams(builder, props);
>     streams.start();
>     Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>   }
> }
> public class DebugTransformer implements Transformer<String, String,
> KeyValue<String, String>> {
>   private KafkaConsumer<String, String> consumer;
>   private ProcessorContext context;
>   @Override
>   public void init(ProcessorContext context) {
>     this.context = context;
>     Properties props = new Properties();
>     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
>     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>     consumer = new KafkaConsumer<>(props);
>   }
>   @Override
>   public KeyValue<String, String> transform(String key, String value) {
>     TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, 
> context.partition());
>     consumer.assign(Arrays.asList(partition));
>     consumer.seek(partition, 0);
>     consumer.poll(100);
>     return KeyValue.pair(key, value);
>   }
>   @Override
>   public void close() {
>     consumer.close();
>   }
>   @Override
>   public KeyValue<String, String> punctuate(long timestamp) {
>     return null;
>   }
> }
> Thanks for any hints,
> Andreas
