I just confirmed. `KafkaConsumer.close()` should be idempotent. It's a bug in the consumer.
https://issues.apache.org/jira/browse/KAFKA-5169 -Matthias On 5/3/17 2:20 PM, Matthias J. Sax wrote: > Yes, Streams might call "close" multiple times, as we assume it's an > idempotent operations. > > Thus, if you close "your" Consumer in `DebugTransformer.close()`, the > operation is not idempotent anymore, and thus fails. > `KafkaConsumer.close()` is not idempotent :( > > You can just use a "try-catch" when you call `Consumer.close()` and > swallow the exception. This should fix the issue. > > If you have only one Streams instance, this does not hit, at we don't > call "close" twice. > > > -Matthias > > > On 5/3/17 9:34 AM, Andreas Voss wrote: >> Hi again, >> >> One more observation: The problem only occurs when the two application >> instances are started one after the other with some delay in-between (7 >> seconds in my test setup). So the first instance already started to process >> the events that were in the queue, when the second instance came up. Looks >> like the second instance tries to initiate a rebalance that fails. >> >> Thanks, >> Andreas >> >> >> -----Ursprüngliche Nachricht----- >> Von: Narendra Kumar >> Gesendet: Mittwoch, 3. Mai 2017 13:11 >> An: Andreas Voss <[email protected]>; [email protected] >> Cc: Wladimir Schmidt <[email protected]>; Christian Leiner >> <[email protected]> >> Betreff: RE: Consumer with another group.id conflicts with streams() >> >> Hi Matthias, >> >> I enabled logs in the application and this is what I have observed --> >> >> While rebalancing ProcessorNode.close() is getting called twice, once from >> StreamThread.suspendTasksAndState() and once from >> StreamThread.closeNonAssignedSuspendedTasks(). DebugTranDebugTransformer has >> instance field of type KafkaConsumer to do some lookup. >> DebugTransformer.close() throws exception when called second time (i.e. >> IllegalStateException from KafkaConsumer instance because it is already >> closed.), we fail to close the task's state manager ( i.e. call to >> task.closeStateManager(true); fails). StateManager is still holding lock to >> the task's state directory. After rebalance, if the same task id is launched >> on same application instance but in different thread then the task get stuck >> because it fails to get lock to the task's state directory. The task get >> stuck forever and can't process any data from here. Depending on how much >> data was there on the partition at the time of rebalance we see different >> number of alerts. >> >> I have attached the log file for reference. >> >> Thanks, >> Narendra >> >> >> -----Original Message----- >> From: Andreas Voss >> Sent: Tuesday, April 25, 2017 1:27 PM >> To: [email protected] >> Cc: Narendra Kumar <[email protected]>; Wladimir Schmidt >> <[email protected]>; Christian Leiner <[email protected]> >> Subject: Re: Consumer with another group.id conflicts with streams() >> >> Hi Matthias, >> >> thank you for your response. Here are some more details: >> >> - my input and output topics have 6 partitions each. >> - my application instances run from docker images so the is no state left >> over from a previous run >> - I have a single docker container running kafka + zookeeper (spotify/kafka). >> - when the second consumer is in place, I receive a random number of records >> in the target topic (e.g. I send 1000 records and receive 439 on first run >> and 543 on second run) >> - the problem only occurs if two instances of the application are running. >> If I only start one instance then it's slow but when I send 1000 records I >> also receive 1000 records. I think this is also an indicator for a bug, >> because a streaming app should behave the same, independent of whether one >> or two instances are running. >> - I added the properties you suggested, but behavior did not change. >> >> I think this is a bug, consumers of different groups should not interact >> with each other. Should I submit a bug report and if so, any suggestions on >> how to do that? >> >> Andreas >> >> >> -----Ursprüngliche Nachricht----- >> Von: Matthias J. Sax [mailto:[email protected]] >> Gesendet: Montag, 24. April 2017 19:18 >> An: [email protected] >> Betreff: Re: Consumer with another group.id conflicts with streams() >> >> Hi, >> >> hard to diagnose. The new consumer should not affect the Streams app though >> -- even if I am wondering why you need it. >> >>> KafkaConsumer (with a UUID as group.id) that reads some historical >>> data from input topic >> >> Maybe using GlobalKTable instead might be a better solution? >> >>> (i.e. I feed 1000 records into source topic and receive around 200 on >>> the target topic) >> >> Are this the first 200 records? Or are those 200 record "random ones" >> from your input topic? How many partitions do you have for input/output >> topic? >> >>> looks like a lot of rebalancing happens. >> >> We recommend to change StreamsConfig values as follows to improve in >> rebalance behavior: >> >>> props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 >>> from default of 0 >>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity >>> from default of 300 s >> >> We will change the default values accordingly in future release but for now >> you should set this manually. >> >> >> Hope this helps. >> >> >> -Matthias >> >> On 4/24/17 10:01 AM, Andreas Voss wrote: >>> Hi, I have a simple streaming app that copies data from one topic to >>> another, so when I feed 1000 records into source topic I receive 1000 >>> records in the target topic. Also the app contains a transform() step which >>> does nothing, except instantiating a KafkaConsumer (with a UUID as >>> group.id) that reads some historical data from input topic. As soon as this >>> consumer is in place, the streaming app does not work anymore, records get >>> lost (i.e. I feed 1000 records into source topic and receive around 200 on >>> the target topic) and it's terribly slow - looks like a lot of rebalancing >>> happens. >>> >>> To me this looks like a bug, because the KStreamBuilder uses the >>> application id as group.id ("kafka-smurfing" in this case), and the >>> transformer uses a different one (uuid). >>> >>> Full source code: >>> >>> public class Main { >>> >>> public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092"; >>> public static final String SOURCE_TOPIC = "transactions"; >>> public static final String TARGET_TOPIC = "alerts"; >>> >>> public static void main(String[] args) throws Exception { >>> >>> KStreamBuilder builder = new KStreamBuilder(); >>> builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC) >>> .transform(() -> new DebugTransformer()) >>> .to(Serdes.String(), Serdes.String(), TARGET_TOPIC); >>> >>> Properties props = new Properties(); >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing"); >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> Main.BOOTSTRAP_SERVERS); >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>> >>> KafkaStreams streams = new KafkaStreams(builder, props); >>> streams.start(); >>> Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); >>> >>> } >>> >>> } >>> >>> public class DebugTransformer implements Transformer<String, String, >>> KeyValue<String, String>> { >>> >>> private KafkaConsumer<String, String> consumer; >>> private ProcessorContext context; >>> >>> @Override >>> public void init(ProcessorContext context) { >>> this.context = context; >>> Properties props = new Properties(); >>> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >>> Main.BOOTSTRAP_SERVERS); >>> props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); >>> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); >>> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>> StringDeserializer.class.getName()); >>> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> StringDeserializer.class.getName()); >>> consumer = new KafkaConsumer<>(props); >>> } >>> >>> @Override >>> public KeyValue<String, String> transform(String key, String value) { >>> TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, >>> context.partition()); >>> consumer.assign(Arrays.asList(partition)); >>> consumer.seek(partition, 0); >>> consumer.poll(100); >>> return KeyValue.pair(key, value); >>> } >>> >>> @Override >>> public void close() { >>> consumer.close(); >>> } >>> >>> @Override >>> public KeyValue<String, String> punctuate(long timestamp) { >>> return null; >>> } >>> >>> } >>> >>> Thanks for any hints, >>> Andreas >>> >>> This email and any files transmitted with it are confidential, proprietary >>> and intended solely for the individual or entity to whom they are >>> addressed. If you have received this email in error please delete it >>> immediately. >>> >> >> This email and any files transmitted with it are confidential, proprietary >> and intended solely for the individual or entity to whom they are addressed. >> If you have received this email in error please delete it immediately. >> >
signature.asc
Description: OpenPGP digital signature
