Hi Xiao,

I think the fix for IllegalStateExcepetion is correct.
Can you also create a ticket and submit a patch?

Thanks.

Jiangjie (Becket) Qin

On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote:

>Hi community,
>
>I wanted to know if the solution I supplied can fix the
>IllegalMonitorStateException
>issue. Our work is pending on this and we'd like to proceed ASAP. Sorry
>for
>bothering.
>
>On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> wrote:
>
>> I think I worked out the answer to question 1.
>>java.lang.IllegalMonitorStateException
>> was thrown due to no ownership of ReentrantLock when trying to call
>>await()
>> on the lock condition.
>>
>> Here is the code snippet from the AbstractFetcherThread.scala in trunk
>>
>> partitionMapLock synchronized {
>>             partitionsWithError ++= partitionMap.keys
>>             // there is an error occurred while fetching partitions,
>>sleep
>> a while
>>             partitionMapCond.await(fetchBackOffMs,
>>TimeUnit.MILLISECONDS)
>> }
>>
>> as shown above partitionMapLock is not acquired before calling
>> partitionMapCond.await
>>
>> we can fix this by explicitly calling partitionMapLock.lock(). below
>>code
>> block should work
>>
>> inLock(partitionMapLock) {
>>             partitionsWithError ++= partitionMap.keys
>>             // there is an error occurred while fetching partitions,
>>sleep
>> a while
>>             partitionMapCond.await(fetchBackOffMs,
>>TimeUnit.MILLISECONDS)
>> }
>>
>> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I was running a mirror maker and got
>>>  java.lang.IllegalMonitorStateException that caused the underlying
>>>fetcher
>>> thread completely stopped. Here is the log from mirror maker.
>>>
>>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
>>> java.io.EOFException: Received -1 when reading from channel, socket has
>>> likely been closed. (kafka.consumer.SimpleConsumer)
>>> [2015-03-21 02:11:53,081] WARN
>>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
>>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
>>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
>>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576).
>>> Possible cause: java.nio.channels.ClosedChannelException
>>> (kafka.consumer.ConsumerFetcherThread)
>>> [2015-03-21 02:11:53,083] ERROR
>>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
>>>due to
>>>  (kafka.consumer.ConsumerFetcherThread)
>>> java.lang.IllegalMonitorStateException
>>>         at
>>> 
>>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j
>>>ava:155)
>>>         at
>>> 
>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu
>>>euedSynchronizer.java:1260)
>>>         at
>>> 
>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr
>>>actQueuedSynchronizer.java:1723)
>>>         at
>>> 
>>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
>>>ait(AbstractQueuedSynchronizer.java:2166)
>>>         at
>>> 
>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
>>>read.scala:106)
>>>         at
>>> 
>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90
>>>)
>>>         at 
>>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>>> [2015-03-21 02:11:53,083] INFO
>>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
>>>  (kafka.consumer.ConsumerFetcherThread)
>>>
>>> I am still investigating what caused the connection error on server
>>>side
>>> but I have a couple of questions related to mirror maker itself
>>>
>>> 1. What is root cause of java.lang.IllegalMonitorStateException? As
>>>shown
>>> in the AbstractFetcherThread source the fetcher thread should catch the
>>> java.io.EOFException thrown from underlying simplyConsumer and sleep a
>>> while before next run.
>>> 2. Mirror maker is unaware of the termination of fetcher thread. That
>>> makes it unable to detect the failure and trigger rebalancing. I have 3
>>> mirror maker instances running in 3 different machines listening to the
>>> same topic. I would expect the mirror maker will release the partition
>>> ownership when underlying fetcher thread terminates so that
>>>rebalancing can
>>> be triggered.but in fact this is not the case. is this expected
>>>behavior or
>>> do I miss configure anything?
>>>
>>> I am running the trunk version as of commit
>>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
>
>-- 
>Regards,
>Tao

Reply via email to