Makes sense ... I will create the JIRA and attach the patch too.

Thanks,
Viral


On Fri, Oct 17, 2014 at 8:18 AM, Hari Shreedharan <hshreedha...@cloudera.com
> wrote:

> I think we should get rid of the sfWriters lock and just used synchronized
> blocks everywhere.
>
> Thanks, Hari
>
>
> On Mon, Jul 28, 2014 at 3:04 PM, Viral Bajaria <viral.baja...@gmail.com>
> wrote:
>
>> I found the issue.
>>
>> The deadlock happens since I have 3 events that can close a file:
>>
>> 1) maxOpenFiles: set to 500
>> 2) maxFileSize : set to 128MB
>>  3) idleTimeout : set to 30 seconds
>>
>> The race condition occurs when the same BucketWriter is being evicted via
>> the maxOpenFiles and the idleTimeout because they both try to kick things
>> using separate code paths but using synchronized code + method.
>>
>> When idleTimeout thread kicks in and calls close(true) on the
>> BucketWriter which is a synchronized method. This is in turn calls the
>> closeCallBack which has a synchronized block to remove an entry from the
>> sfWriters LinkedHashMap. Now it waits on sfWritersLock here.
>>
>> In the meantime, we tried to add an entry to the LinkedHashMap, obviously
>> after we acquired the sfWritersLock. The LinkedHashMap reached it's limit
>> of 500 and so the removeEldestEntry got called which in turn called the
>> close() method on BucketWriter which in turn calls the close(false). Now
>> because the previous thread was already in this synchronized block, we
>> don't get to enter it and so we block. But since we already have the
>> sfWritersLock, the previous thread is blocked on us now.
>>
>> This results in a deadlock.
>>
>> For now I have removed the idleTimeout since a file will eventually get
>> closed due to the LRU but this is a known issue.
>>
>> Should I open a JIRA for this ?
>>
>> Thanks,
>> Viral
>>
>>
>>
>>
>>
>> On Sun, Jul 27, 2014 at 7:25 PM, Viral Bajaria <viral.baja...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am using 1.5.0 with the hdfs sink to write files to S3. The process
>>> writes fine for a while but eventually I started getting the following
>>> message:
>>>
>>>  INFO  [pool-5-thread-1]
>>> (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents:224)
>>>  - Last read was never committed - resetting mark position.
>>> WARN  [pool-5-thread-1]
>>> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:238)
>>>  - The channel is full, and cannot write data now. The source will try
>>> again after 4000 milliseconds
>>>
>>> I looked into the logs and I don't see any errors besides the channel is
>>> full.
>>>
>>> I saw that the number of open files for writing to HDFS is 1 shy of the
>>> max count (500) and that's when this issue starts every single time.
>>>
>>> Following are my configuration settings:
>>>
>>>  *BEGIN CONFIG*
>>> agent.sources = spooldir
>>> agent.channels = memoryChannel
>>> agent.sinks = s3Sink
>>>
>>> agent.sources.spooldir.channels = memoryChannel
>>> agent.sources.spooldir.type = spooldir
>>> agent.sources.spooldir.spoolDir = <spool_directory>
>>> agent.sources.spooldir.deserializer.maxLineLength = 4096
>>> agent.sources.spooldir.decodeErrorPolicy = IGNORE
>>>
>>> #s3 sink
>>> agent.sinks.s3Sink.channel = memoryChannel
>>> agent.sinks.s3Sink.type = hdfs
>>> agent.sinks.s3Sink.hdfs.rollInterval = 0
>>> agent.sinks.s3Sink.hdfs.rollSize = 134217728
>>> agent.sinks.s3Sink.hdfs.rollCount = 0
>>> agent.sinks.s3Sink.hdfs.batchSize = 1000
>>> agent.sinks.s3Sink.hdfs.maxOpenFiles = 500
>>> agent.sinks.s3Sink.hdfs.idleTimeout = 30
>>> agent.sinks.s3Sink.hdfs.codeC = gzip
>>> agent.sinks.s3Sink.hdfs.writeFormat = Text
>>> agent.sinks.s3Sink.hdfs.path = <s3_path>
>>> #hdfs sink uses java.util.Time and needs the long timezone format
>>> agent.sinks.s3Sink.hdfs.timeZone = America/New_York
>>>
>>>
>>> agent.channels.memoryChannel.type = memory
>>> agent.channels.memoryChannel.capacity = 100000
>>> agent.channels.memoryChannel.transactionCapacity = 1000
>>>
>>> *END CONFIG*
>>>
>>> Out of curiosity, I did a jstack on the process and it says there is a
>>> deadlock:
>>>
>>>  Java stack information for the threads listed above:
>>> ===================================================
>>> "hdfs-s3Sink-roll-timer-0":
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:404)
>>>         - waiting to lock <0x000000077ae51798> (a java.lang.Object)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter.runCloseAction(BucketWriter.java:487)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:448)
>>>         - locked <0x0000000781f08ec0> (a
>>> org.apache.flume.sink.hdfs.BucketWriter)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:472)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:467)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> "SinkRunner-PollingRunner-DefaultSinkProcessor":
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:331)
>>>         - waiting to lock <0x0000000781f08ec0> (a
>>> org.apache.flume.sink.hdfs.BucketWriter)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$WriterLinkedHashMap.removeEldestEntry(HDFSEventSink.java:175)
>>>         at java.util.LinkedHashMap.addEntry(LinkedHashMap.java:431)
>>>         at java.util.HashMap.put(HashMap.java:509)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:415)
>>>         - locked <0x000000077ae51798> (a java.lang.Object)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Found 1 deadlock.
>>>
>>> Is the deadlock the issue for the channel getting choked and not moving
>>> forward ?
>>>
>>> Thanks,
>>> Viral
>>>
>>>
>>>
>>
>

Reply via email to