Hi,

hard to diagnose. The new consumer should not affect the Streams app
though -- even if I am wondering why you need it.

> KafkaConsumer (with a UUID as group.id) that reads some historical data from 
> input topic

Maybe using GlobalKTable instead might be a better solution?

> (i.e. I feed 1000 records into source topic and receive around 200 on the 
> target topic)

Are this the first 200 records? Or are those 200 record "random ones"
from your input topic? How many partitions do you have for input/output
topic?

> looks like a lot of rebalancing happens.

We recommend to change StreamsConfig values as follows to improve in
rebalance behavior:

> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from 
> default of 0
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity from 
> default of 300 s

We will change the default values accordingly in future release but for
now you should set this manually.


Hope this helps.


-Matthias

On 4/24/17 10:01 AM, Andreas Voss wrote:
> Hi, I have a simple streaming app that copies data from one topic to another, 
> so when I feed 1000 records into source topic I receive 1000 records in the 
> target topic. Also the app contains a transform() step which does nothing, 
> except instantiating a KafkaConsumer (with a UUID as group.id) that reads 
> some historical data from input topic. As soon as this consumer is in place, 
> the streaming app does not work anymore, records get lost (i.e. I feed 1000 
> records into source topic and receive around 200 on the target topic) and 
> it's terribly slow - looks like a lot of rebalancing happens.
> 
> To me this looks like a bug, because the KStreamBuilder uses the application 
> id as group.id ("kafka-smurfing" in this case), and the transformer uses a 
> different one (uuid).
> 
> Full source code:
> 
> public class Main {
> 
>   public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
>   public static final String SOURCE_TOPIC = "transactions";
>   public static final String TARGET_TOPIC =  "alerts";
> 
>   public static void main(String[] args) throws Exception {
> 
>     KStreamBuilder builder = new KStreamBuilder();
>     builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
>            .transform(() -> new DebugTransformer())
>            .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
> 
>     Properties props = new Properties();
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
>     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
>     KafkaStreams streams = new KafkaStreams(builder, props);
>     streams.start();
>     Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> 
>   }
> 
> }
> 
> public class DebugTransformer implements Transformer<String, String, 
> KeyValue<String, String>> {
> 
>   private KafkaConsumer<String, String> consumer;
>   private ProcessorContext context;
> 
>   @Override
>   public void init(ProcessorContext context) {
>     this.context = context;
>     Properties props = new Properties();
>     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> Main.BOOTSTRAP_SERVERS);
>     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>     consumer = new KafkaConsumer<>(props);
>   }
> 
>   @Override
>   public KeyValue<String, String> transform(String key, String value) {
>     TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, 
> context.partition());
>     consumer.assign(Arrays.asList(partition));
>     consumer.seek(partition, 0);
>     consumer.poll(100);
>     return KeyValue.pair(key, value);
>   }
> 
>   @Override
>   public void close() {
>     consumer.close();
>   }
> 
>   @Override
>   public KeyValue<String, String> punctuate(long timestamp) {
>     return null;
>   }
> 
> }
> 
> Thanks for any hints,
> Andreas
> 
> This email and any files transmitted with it are confidential, proprietary 
> and intended solely for the individual or entity to whom they are addressed. 
> If you have received this email in error please delete it immediately.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to