This is because the memory channel has a default transaction capacity of 100. Increasing it (or keeping sinks's batchSize < transaction capacity of the channel will fix the issue). See http://flume.apache.org/FlumeUserGuide.html#memory-channel for more details.
Hari -- Hari Shreedharan On Monday, February 25, 2013 at 11:41 PM, 周梦想 wrote: > I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report > this error. > if I set this configure to 10, it's ok. but it's a bit slower. > > Best Regards, > Andy > > 2013/2/26 周梦想 <abloz...@gmail.com (mailto:abloz...@gmail.com)> > > more logs: > > > > 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor) > > [ERROR - > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable > > to deliver event. Exception follows. > > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: > > Take list for MemoryTransaction, capacity 100 full, consider committing > > more frequently, increasing capacity, or increasing thread count > > at > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464) > > at > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:722) > > > > Caused by: org.apache.flume.ChannelException: Take list for > > MemoryTransaction, capacity 100 full, consider committing more frequently, > > increasing capacity, or increasing thread count > > at > > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100) > > at > > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > > at > > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > > at > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391) > > > > ... 3 more > > 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR - > > org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro > > source userlogsrc: Unable to process event batch. Exception follows. > > org.apache.flume.ChannelException: Unable to put batch on required channel: > > org.apache.flume.channel.MemoryChannel{name: memch1} > > at > > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) > > at > > org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259) > > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:601) > > at > > org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88) > > at org.apache.avro.ipc.Responder.respond(Responder.java:149) > > at > > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188) > > at > > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75) > > at > > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) > > at > > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > > at > > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792) > > at > > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) > > at > > org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321) > > at > > org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303) > > at > > org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208) > > at > > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75) > > at > > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > > at > > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) > > at > > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) > > at > > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) > > at > > org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94) > > at > > org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364) > > at > > org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238) > > at > > org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38) > > at > > org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:722) > > Caused by: org.apache.flume.ChannelException: Space for commit to queue > > couldn't be acquired Sinks are likely not keeping up with sources, or the > > buffer size is too tight > > at > > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126) > > at > > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) > > at > > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) > > ... 28 more > > > > > > > > > > 2013/2/26 周梦想 <abloz...@gmail.com (mailto:abloz...@gmail.com)> > > > hello, > > > I using flume-ng send data from windows to linux hdfs through avro > > > protocol, and encountered this error: > > > > > > 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG - > > > org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro > > > source userlogsrc: Received avro event batch of 100 events. > > > > > > 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor) > > > [ERROR - > > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)] > > > process failed > > > org.apache.flume.ChannelException: Take list for MemoryTransaction, > > > capacity 100 full, consider committing more frequently, increasing > > > capacity, or increasing thread count > > > at > > > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100) > > > at > > > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > > > at > > > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > > > at > > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391) > > > at > > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > > at > > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > > at java.lang.Thread.run(Thread.java:722) > > > > > > Caused by: org.apache.flume.ChannelException: Space for commit to queue > > > couldn't be acquired Sinks are likely not keeping up with sources, or the > > > buffer size is too tight > > > at > > > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126) > > > at > > > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) > > > at > > > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) > > > ... 28 more > > > > > > > > > I have set memory channel capacity to 1000, but it still report this > > > error. > > > some one can give me any advice? > > > > > > Thanks, > > > Andy > > > > > > hdfs.conf: > > > > > > agent46.sources = userlogsrc gamelogsrc > > > agent46.channels = memch1 > > > agent46.sinks = myhdfssink > > > > > > #channels: > > > agent46.channels.memch1.type = memory > > > agent46.channels.memch1.capacity = 10000 > > > agent46.channels.memch1.transactionCapactiy = 100 > > > #sources: > > > #userlogsrc: > > > #agent46.sources.userlogsrc.type = syslogTcp > > > agent46.sources.userlogsrc.type = avro > > > agent46.sources.userlogsrc.port = 5140 > > > agent46.sources.userlogsrc.bind= 0.0.0.0 > > > #agent46.sources.userlogsrc.host= hadoop48 > > > agent46.sources.userlogsrc.interceptors = i1 i2 i3 > > > agent46.sources.userlogsrc.interceptors.i1.type = > > > org.apache.flume.interceptor.HostInterceptor$Builder > > > agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true > > > #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname > > > agent46.sources.userlogsrc.interceptors.i1.useIP = false > > > agent46.sources.userlogsrc.interceptors.i2.type = > > > org.apache.flume.interceptor.TimestampInterceptor$Builder > > > agent46.sources.userlogsrc.interceptors.i3.type = static > > > agent46.sources.userlogsrc.interceptors.i3.key = datacenter > > > agent46.sources.userlogsrc.interceptors.i3.value = userdata > > > agent46.sources.userlogsrc.channels = memch1 > > > #gamelogsrc: > > > #agent46.sources.gamelogsrc.type = syslogTcp > > > agent46.sources.gamelogsrc.type = avro > > > agent46.sources.gamelogsrc.port = 5150 > > > agent46.sources.gamelogsrc.bind= 0.0.0.0 > > > agent46.sources.gamelogsrc.channels = memch1 > > > > > > #sinks: > > > agent46.sinks.myhdfssink.channel = memch1 > > > > > > agent46.sinks.myhdfssink.type = hdfs > > > agent46.sinks.myhdfssink.hdfs.rollInterval = 120 > > > agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000 > > > agent46.sinks.myhdfssink.hdfs.rollSize = 209715200 > > > agent46.sinks.myhdfssink.hdfs.rollCount = 600000 > > > agent46.sinks.myhdfssink.hdfs.batchSize = 1000 > > > agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000 > > > agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100 > > > agent46.sinks.myhdfssink.hdfs.path = > > > hdfs://h46:9000/flume/%{filename}/%m%d > > > #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host} > > > #agent46.sinks.myhdfssink.hdfs.filePrefix = > > > %{filename}.%{hostname}.%{datacenter}.%Y%m%d > > > agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d > > > #agent46.sinks.myhdfssink.hdfs.rollInterval = 60 > > > #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile > > > agent46.sinks.myhdfssink.hdfs.fileType = DataStream > > > #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text > > > > > > > > > > > >