Hi, I have a simple streaming app that copies data from one topic to another, 
so when I feed 1000 records into source topic I receive 1000 records in the 
target topic. Also the app contains a transform() step which does nothing, 
except instantiating a KafkaConsumer (with a UUID as group.id) that reads some 
historical data from input topic. As soon as this consumer is in place, the 
streaming app does not work anymore, records get lost (i.e. I feed 1000 records 
into source topic and receive around 200 on the target topic) and it's terribly 
slow - looks like a lot of rebalancing happens.

To me this looks like a bug, because the KStreamBuilder uses the application id 
as group.id ("kafka-smurfing" in this case), and the transformer uses a 
different one (uuid).

Full source code:

public class Main {

  public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
  public static final String SOURCE_TOPIC = "transactions";
  public static final String TARGET_TOPIC =  "alerts";

  public static void main(String[] args) throws Exception {

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
           .transform(() -> new DebugTransformer())
           .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

  }

}

public class DebugTransformer implements Transformer<String, String, 
KeyValue<String, String>> {

  private KafkaConsumer<String, String> consumer;
  private ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
    consumer = new KafkaConsumer<>(props);
  }

  @Override
  public KeyValue<String, String> transform(String key, String value) {
    TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, 
context.partition());
    consumer.assign(Arrays.asList(partition));
    consumer.seek(partition, 0);
    consumer.poll(100);
    return KeyValue.pair(key, value);
  }

  @Override
  public void close() {
    consumer.close();
  }

  @Override
  public KeyValue<String, String> punctuate(long timestamp) {
    return null;
  }

}

Thanks for any hints,
Andreas

This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.

Reply via email to