[ https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14284963#comment-14284963 ]
Neha Narkhede edited comment on KAFKA-1886 at 1/26/15 1:25 AM: --------------------------------------------------------------- If interested, I hacked an existing test for this. {code} def testConsumerEmptyTopic() { val newTopic = "new-topic" TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) val thread = new Thread { override def run { System.out.println("Starting the fetch") val start = System.currentTimeMillis() try { val fetchResponse = consumer.fetch(new FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0, 10000).build()) } catch { case e: Throwable =>{ val end = System.currentTimeMillis() System.out.println("Caught exception" + e + ". Took " + (end - start)); System.out.println("Fetch interrupted " + Thread.currentThread().isInterrupted) } } } } thread.start() Thread.sleep(1000) thread.interrupt() thread.join() System.out.println("Ending test") } {code} was (Author: auradkar): If interested, I hacked an existing test for this. def testConsumerEmptyTopic() { val newTopic = "new-topic" TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) val thread = new Thread { override def run { System.out.println("Starting the fetch") val start = System.currentTimeMillis() try { val fetchResponse = consumer.fetch(new FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0, 10000).build()) } catch { case e: Throwable =>{ val end = System.currentTimeMillis() System.out.println("Caught exception" + e + ". Took " + (end - start)); System.out.println("Fetch interrupted " + Thread.currentThread().isInterrupted) } } } } thread.start() Thread.sleep(1000) thread.interrupt() thread.join() System.out.println("Ending test") } > SimpleConsumer swallowing ClosedByInterruptException > ---------------------------------------------------- > > Key: KAFKA-1886 > URL: https://issues.apache.org/jira/browse/KAFKA-1886 > Project: Kafka > Issue Type: Bug > Components: producer > Reporter: Aditya A Auradkar > Assignee: Jun Rao > Attachments: KAFKA-1886.patch > > > This issue was originally reported by a Samza developer. I've included an > exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on > my dev setup. > From: criccomi > Hey all, > Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to > interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches > Throwable in its sendRequest method [2]. I'm wondering: if > blockingChannel.send/receive throws a ClosedByInterruptException > when the thread is interrupted, what happens? It looks like sendRequest will > catch the exception (which I > think clears the thread's interrupted flag), and then retries the send. If > the send succeeds on the retry, I think that the ClosedByInterruptException > exception is effectively swallowed, and the BrokerProxy will continue > fetching messages as though its thread was never interrupted. > Am I misunderstanding how things work? > Cheers, > Chris > [1] > https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126 > [2] > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75 -- This message was sent by Atlassian JIRA (v6.3.4#6332)