Hi, The spool dir source does not have built in functionality to read compressed files. One could be built, but I think it would either require a subclass of https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializer.javaor changes to https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.javaso that it recognized compressed files.
In regards to over the wire compression, there is an open review on that now: https://reviews.apache.org/r/9427/ Brock On Sat, Feb 23, 2013 at 9:17 AM, Langston, Jim <[email protected]>wrote: > Hi all, > > A question on sending compressed files from a remote source > to HDFS. > > I have been working with .94 of flume and gzip a file before sending > it from a remote location to a HDFS cluster. Works great. > > Now, I'm looking to move to 1.2 or 1.3.1 (CDH4 installs 1.2 by > default through the management tool), but I don't see the > equivalent in 1.2 or 1.3.1. I found the reference to utilize the > new source in 1.3.1, spoolDir, but when I try to pick up a compressed > file in the spool directory I'm getting an error: > > 13/02/22 19:32:41 ERROR source.SpoolDirectorySource: Uncaught exception > in Runnable > org.apache.flume.ChannelException: Unable to put batch on required > channel: org.apache.flume.channel.MemoryChannel{name: c1} > at > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) > at > org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:143) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > Caused by: org.apache.flume.ChannelException: Space for commit to queue > couldn't be acquired Sinks are likely not keeping up with sources, or the > buffer size is too tight > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126) > at > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) > at > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) > ... 9 more > > > I have tried to increase the buffer size but it did not change the > error. My current configuration file which > generated the error: > > # Compressed Source > agent_compressed.sources = r1 > agent_compressed.channels = c1 > agent_compressed.channels.c1.type = memory > agent_compressed.sources.r1.type = spooldir > agent_compressed.sources.r1.bufferMaxLineLength = 50000 > agent_compressed.sources.r1.spoolDir = /tmp/COMPRESS > agent_compressed.sources.r1.fileHeader = true > agent_compressed.sources.r1.channels = c1 > > # Sink for Avro > agent_compressed.sinks = avroSink-2 > agent_compressed.sinks.avroSink-2.type = avro > agent_compressed.sinks.avroSink-2.channel = c1 > agent_compressed.sinks.avroSink-2.hostname = xxx.xxx.xxx.xxx > agent_compressed.sinks.avroSink-2.port = xxxxx > > > Thoughts? Hints ? > > > Thanks, > > Jim > > -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
