This is because the memory channel has a default transaction capacity of 100. 
Increasing it (or keeping sinks's batchSize < transaction capacity of the 
channel will fix the issue). See 
http://flume.apache.org/FlumeUserGuide.html#memory-channel for more details.

Hari  

--  
Hari Shreedharan


On Monday, February 25, 2013 at 11:41 PM, 周梦想 wrote:

> I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report 
> this error.
> if I set this configure to 10, it's ok. but it's a bit slower.
>  
> Best Regards,
> Andy
>  
> 2013/2/26 周梦想 <abloz...@gmail.com (mailto:abloz...@gmail.com)>
> > more logs:
> >  
> > 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> > [ERROR - 
> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable 
> > to deliver event. Exception follows.  
> > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
> > Take list for MemoryTransaction, capacity 100 full, consider committing 
> > more frequently, increasing capacity, or increasing thread count
> >         at 
> > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
> >         at 
> > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> >         at 
> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> >         at java.lang.Thread.run(Thread.java:722)
> >  
> > Caused by: org.apache.flume.ChannelException: Take list for 
> > MemoryTransaction, capacity 100 full, consider committing more frequently, 
> > increasing capacity, or increasing thread count
> >         at 
> > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
> >         at 
> > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> >         at 
> > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> >         at 
> > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
> >  
> >         ... 3 more
> > 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR - 
> > org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro 
> > source userlogsrc: Unable to process event batch. Exception follows.
> > org.apache.flume.ChannelException: Unable to put batch on required channel: 
> > org.apache.flume.channel.MemoryChannel{name: memch1}
> >         at 
> > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> >         at 
> > org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
> >         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> >         at 
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:601)
> >         at 
> > org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
> >         at org.apache.avro.ipc.Responder.respond(Responder.java:149)
> >         at 
> > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
> >         at 
> > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
> >         at 
> > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
> >         at 
> > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> >         at 
> > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
> >         at 
> > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
> >         at 
> > org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
> >         at 
> > org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
> >         at 
> > org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
> >         at 
> > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
> >         at 
> > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> >         at 
> > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> >         at 
> > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
> >         at 
> > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
> >         at 
> > org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
> >         at 
> > org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
> >         at 
> > org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
> >         at 
> > org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
> >         at 
> > org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> >         at java.lang.Thread.run(Thread.java:722)
> > Caused by: org.apache.flume.ChannelException: Space for commit to queue 
> > couldn't be acquired Sinks are likely not keeping up with sources, or the 
> > buffer size is too tight
> >         at 
> > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
> >         at 
> > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> >         at 
> > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> >         ... 28 more
> >  
> >  
> >  
> >  
> > 2013/2/26 周梦想 <abloz...@gmail.com (mailto:abloz...@gmail.com)>
> > > hello,
> > > I using flume-ng send data from windows to linux hdfs through avro 
> > > protocol, and encountered this error:
> > >  
> > > 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG - 
> > > org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro 
> > > source userlogsrc: Received avro event batch of 100 events.  
> > >  
> > > 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> > > [ERROR - 
> > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)] 
> > > process failed
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction, 
> > > capacity 100 full, consider committing more frequently, increasing 
> > > capacity, or increasing thread count
> > >         at 
> > > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
> > >         at 
> > > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> > >         at 
> > > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> > >         at 
> > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
> > >         at 
> > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > >         at 
> > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > >         at java.lang.Thread.run(Thread.java:722)
> > >  
> > > Caused by: org.apache.flume.ChannelException: Space for commit to queue 
> > > couldn't be acquired Sinks are likely not keeping up with sources, or the 
> > > buffer size is too tight
> > >         at 
> > > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
> > >         at 
> > > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> > >         at 
> > > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> > >         ... 28 more
> > >  
> > >  
> > > I have set memory channel capacity to 1000, but it still report this 
> > > error.  
> > > some one can give me any advice?
> > >  
> > > Thanks,
> > > Andy
> > >  
> > > hdfs.conf:
> > >  
> > > agent46.sources = userlogsrc gamelogsrc
> > > agent46.channels = memch1
> > > agent46.sinks = myhdfssink
> > >  
> > > #channels:
> > > agent46.channels.memch1.type = memory
> > > agent46.channels.memch1.capacity = 10000
> > > agent46.channels.memch1.transactionCapactiy = 100
> > > #sources:
> > > #userlogsrc:
> > > #agent46.sources.userlogsrc.type = syslogTcp
> > > agent46.sources.userlogsrc.type = avro
> > > agent46.sources.userlogsrc.port = 5140
> > > agent46.sources.userlogsrc.bind= 0.0.0.0
> > > #agent46.sources.userlogsrc.host= hadoop48
> > > agent46.sources.userlogsrc.interceptors = i1 i2 i3
> > > agent46.sources.userlogsrc.interceptors.i1.type = 
> > > org.apache.flume.interceptor.HostInterceptor$Builder
> > > agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
> > > #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
> > > agent46.sources.userlogsrc.interceptors.i1.useIP = false
> > > agent46.sources.userlogsrc.interceptors.i2.type = 
> > > org.apache.flume.interceptor.TimestampInterceptor$Builder
> > > agent46.sources.userlogsrc.interceptors.i3.type = static
> > > agent46.sources.userlogsrc.interceptors.i3.key = datacenter
> > > agent46.sources.userlogsrc.interceptors.i3.value = userdata
> > > agent46.sources.userlogsrc.channels = memch1
> > > #gamelogsrc:
> > > #agent46.sources.gamelogsrc.type = syslogTcp
> > > agent46.sources.gamelogsrc.type = avro
> > > agent46.sources.gamelogsrc.port = 5150
> > > agent46.sources.gamelogsrc.bind= 0.0.0.0
> > > agent46.sources.gamelogsrc.channels = memch1
> > >  
> > > #sinks:
> > > agent46.sinks.myhdfssink.channel = memch1
> > >  
> > > agent46.sinks.myhdfssink.type = hdfs
> > > agent46.sinks.myhdfssink.hdfs.rollInterval = 120
> > > agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
> > > agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
> > > agent46.sinks.myhdfssink.hdfs.rollCount = 600000
> > > agent46.sinks.myhdfssink.hdfs.batchSize = 1000
> > > agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
> > > agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
> > > agent46.sinks.myhdfssink.hdfs.path = 
> > > hdfs://h46:9000/flume/%{filename}/%m%d
> > > #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
> > > #agent46.sinks.myhdfssink.hdfs.filePrefix = 
> > > %{filename}.%{hostname}.%{datacenter}.%Y%m%d
> > > agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
> > > #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
> > > #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
> > > agent46.sinks.myhdfssink.hdfs.fileType = DataStream
> > > #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
> > >  
> > >  
> > >  
> >  
>  

Reply via email to