Hi, hard to diagnose. The new consumer should not affect the Streams app though -- even if I am wondering why you need it.
> KafkaConsumer (with a UUID as group.id) that reads some historical data from > input topic Maybe using GlobalKTable instead might be a better solution? > (i.e. I feed 1000 records into source topic and receive around 200 on the > target topic) Are this the first 200 records? Or are those 200 record "random ones" from your input topic? How many partitions do you have for input/output topic? > looks like a lot of rebalancing happens. We recommend to change StreamsConfig values as follows to improve in rebalance behavior: > props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 from > default of 0 > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity from > default of 300 s We will change the default values accordingly in future release but for now you should set this manually. Hope this helps. -Matthias On 4/24/17 10:01 AM, Andreas Voss wrote: > Hi, I have a simple streaming app that copies data from one topic to another, > so when I feed 1000 records into source topic I receive 1000 records in the > target topic. Also the app contains a transform() step which does nothing, > except instantiating a KafkaConsumer (with a UUID as group.id) that reads > some historical data from input topic. As soon as this consumer is in place, > the streaming app does not work anymore, records get lost (i.e. I feed 1000 > records into source topic and receive around 200 on the target topic) and > it's terribly slow - looks like a lot of rebalancing happens. > > To me this looks like a bug, because the KStreamBuilder uses the application > id as group.id ("kafka-smurfing" in this case), and the transformer uses a > different one (uuid). > > Full source code: > > public class Main { > > public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092"; > public static final String SOURCE_TOPIC = "transactions"; > public static final String TARGET_TOPIC = "alerts"; > > public static void main(String[] args) throws Exception { > > KStreamBuilder builder = new KStreamBuilder(); > builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC) > .transform(() -> new DebugTransformer()) > .to(Serdes.String(), Serdes.String(), TARGET_TOPIC); > > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > > } > > } > > public class DebugTransformer implements Transformer<String, String, > KeyValue<String, String>> { > > private KafkaConsumer<String, String> consumer; > private ProcessorContext context; > > @Override > public void init(ProcessorContext context) { > this.context = context; > Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > Main.BOOTSTRAP_SERVERS); > props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > consumer = new KafkaConsumer<>(props); > } > > @Override > public KeyValue<String, String> transform(String key, String value) { > TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, > context.partition()); > consumer.assign(Arrays.asList(partition)); > consumer.seek(partition, 0); > consumer.poll(100); > return KeyValue.pair(key, value); > } > > @Override > public void close() { > consumer.close(); > } > > @Override > public KeyValue<String, String> punctuate(long timestamp) { > return null; > } > > } > > Thanks for any hints, > Andreas > > This email and any files transmitted with it are confidential, proprietary > and intended solely for the individual or entity to whom they are addressed. > If you have received this email in error please delete it immediately. >
signature.asc
Description: OpenPGP digital signature