[
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692411#comment-13692411
]
Alexey Ozeritskiy commented on KAFKA-937:
-----------------------------------------
kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest
To reproduce just make git pull and run
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group
--zkconnect zk-servers --topic topic
The problem is in the following diff:
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index bdeee91..1c28328 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -37,6 +37,7 @@ class SimpleConsumer(val host: String,
private val blockingChannel = new BlockingChannel(host, port, bufferSize,
BlockingChannel.UseDefaultBufferSize, soTimeout)
val brokerInfo = "host_%s-port_%s".format(host, port)
private val fetchRequestAndResponseStats =
FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
+ private var isClosed = false
private def connect(): BlockingChannel = {
close
@@ -58,7 +59,8 @@ class SimpleConsumer(val host: String,
def close() {
lock synchronized {
- disconnect()
+ disconnect()
+ isClosed = true
}
}
@@ -123,7 +125,7 @@ class SimpleConsumer(val host: String,
def getOffsetsBefore(request: OffsetRequest) =
OffsetResponse.readFrom(sendRequest(request).buffer)
private def getOrMakeConnection() {
- if(!blockingChannel.isConnected) {
+ if(!isClosed && !blockingChannel.isConnected) {
connect()
}
}
SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77)
> ConsumerFetcherThread can deadlock
> ----------------------------------
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.8
> Reporter: Jun Rao
> Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until
> AbstractFetcherThread.processPartitionsWithError() completes.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira