Hi Xiao, I think the fix for IllegalStateExcepetion is correct. Can you also create a ticket and submit a patch?
Thanks. Jiangjie (Becket) Qin On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote: >Hi community, > >I wanted to know if the solution I supplied can fix the >IllegalMonitorStateException >issue. Our work is pending on this and we'd like to proceed ASAP. Sorry >for >bothering. > >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> wrote: > >> I think I worked out the answer to question 1. >>java.lang.IllegalMonitorStateException >> was thrown due to no ownership of ReentrantLock when trying to call >>await() >> on the lock condition. >> >> Here is the code snippet from the AbstractFetcherThread.scala in trunk >> >> partitionMapLock synchronized { >> partitionsWithError ++= partitionMap.keys >> // there is an error occurred while fetching partitions, >>sleep >> a while >> partitionMapCond.await(fetchBackOffMs, >>TimeUnit.MILLISECONDS) >> } >> >> as shown above partitionMapLock is not acquired before calling >> partitionMapCond.await >> >> we can fix this by explicitly calling partitionMapLock.lock(). below >>code >> block should work >> >> inLock(partitionMapLock) { >> partitionsWithError ++= partitionMap.keys >> // there is an error occurred while fetching partitions, >>sleep >> a while >> partitionMapCond.await(fetchBackOffMs, >>TimeUnit.MILLISECONDS) >> } >> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> wrote: >> >>> Hi, >>> >>> I was running a mirror maker and got >>> java.lang.IllegalMonitorStateException that caused the underlying >>>fetcher >>> thread completely stopped. Here is the log from mirror maker. >>> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: >>> java.io.EOFException: Received -1 when reading from channel, socket has >>> likely been closed. (kafka.consumer.SimpleConsumer) >>> [2015-03-21 02:11:53,081] WARN >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576). >>> Possible cause: java.nio.channels.ClosedChannelException >>> (kafka.consumer.ConsumerFetcherThread) >>> [2015-03-21 02:11:53,083] ERROR >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error >>>due to >>> (kafka.consumer.ConsumerFetcherThread) >>> java.lang.IllegalMonitorStateException >>> at >>> >>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j >>>ava:155) >>> at >>> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu >>>euedSynchronizer.java:1260) >>> at >>> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr >>>actQueuedSynchronizer.java:1723) >>> at >>> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw >>>ait(AbstractQueuedSynchronizer.java:2166) >>> at >>> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh >>>read.scala:106) >>> at >>> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90 >>>) >>> at >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >>> [2015-03-21 02:11:53,083] INFO >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped >>> (kafka.consumer.ConsumerFetcherThread) >>> >>> I am still investigating what caused the connection error on server >>>side >>> but I have a couple of questions related to mirror maker itself >>> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As >>>shown >>> in the AbstractFetcherThread source the fetcher thread should catch the >>> java.io.EOFException thrown from underlying simplyConsumer and sleep a >>> while before next run. >>> 2. Mirror maker is unaware of the termination of fetcher thread. That >>> makes it unable to detect the failure and trigger rebalancing. I have 3 >>> mirror maker instances running in 3 different machines listening to the >>> same topic. I would expect the mirror maker will release the partition >>> ownership when underlying fetcher thread terminates so that >>>rebalancing can >>> be triggered.but in fact this is not the case. is this expected >>>behavior or >>> do I miss configure anything? >>> >>> I am running the trunk version as of commit >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d >>> >>> -- >>> Regards, >>> Tao >>> >> >> >> >> -- >> Regards, >> Tao >> > > > >-- >Regards, >Tao