Can you provide more detail about what kinds of services? If you roll the logs every 5 minutes or so then you can configure the spooling source to pick them up once they are rolled by either rolling them into a directory for immutable files or using the trunk version of the spooling file source to specify a filter to ignore files that don't match a "rolled" pattern.
You could also use exec source with "tail -F" but that is much more unreliable than the spooling file source. Regards, Mike On Thu, Jan 17, 2013 at 10:23 PM, Henry Ma <[email protected]> wrote: > OK, thank you very much, now I know why the problem occurs. > > I am a new comer of Flume. Here is my scenario: using Flume to collecting > from hundreds of directories from dozens of servers to a central storage. > It seems that spooling directory source may not be the best choice. Can > someone give me some advice about how to design the architecture? Which > type of source and sink can fit? > > Thanks! > > > On Fri, Jan 18, 2013 at 2:05 PM, Mike Percy <[email protected]> wrote: > >> Hi Henry, >> The files must be immutable before putting them into the spooling >> directory. So if you copy them from a different file system then you can >> run into this issue. The right way to do it is to copy them to the same >> file system and then atomically move them into the spooling directory. >> >> Regards, >> Mike >> >> >> On Thu, Jan 17, 2013 at 9:59 PM, Henry Ma <[email protected]>wrote: >> >>> Thank you very much! I clean all the related dir and restart again. I >>> keep the source spooling dir empty, then start Flume, and then put some >>> file into the spooling dir. But this time a new error occured: >>> >>> 13/01/18 13:44:24 INFO avro.SpoolingFileLineReader: Preparing to move >>> file >>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp >>> to /disk2/mahy/FLUME_TEST/ >>> source/sspstat.log.20130118112700-20130118112800.hs016.ssp.COMPLETED >>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught exception >>> in Runnable >>> java.lang.IllegalStateException: File has changed size since being read: >>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp >>> at >>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:241) >>> at >>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185) >>> at >>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>> at >>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>> at >>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>> at java.lang.Thread.run(Thread.java:662) >>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught exception >>> in Runnable >>> java.io.IOException: Stream closed >>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>> at java.io.BufferedReader.readLine(BufferedReader.java:292) >>> at java.io.BufferedReader.readLine(BufferedReader.java:362) >>> at >>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180) >>> at >>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>> at >>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>> at >>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>> at java.lang.Thread.run(Thread.java:662) >>> 13/01/18 13:44:25 ERROR source.SpoolDirectorySource: Uncaught exception >>> in Runnable >>> java.io.IOException: Stream closed >>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>> >>> >>> I think it is a typical scenario: Flume is watching some dirs and >>> collecting new arriving files. I don't know why the exception " File has >>> changed size since being read" was throwed and how to avoid it. Can you >>> give some advice and guide? Thanks! >>> >>> >>> On Fri, Jan 18, 2013 at 1:48 PM, Patrick Wendell <[email protected]>wrote: >>> >>>> Hey Henry, >>>> >>>> The Spooling source assumes that each file is uniquely named. If it >>>> sees that new file name has arrived that it already processed (and has >>>> rolled over to a COMPLETED file), it throws an error and shuts down. >>>> This is to try and prevent sending duplicate data downstream. >>>> >>>> Probably the best idea is to clear out the COMPLETED file (and the >>>> original file, if they are indeed the same one) and restart. >>>> >>>> - Patrick >>>> >>>> On Thu, Jan 17, 2013 at 9:31 PM, Brock Noland <[email protected]> >>>> wrote: >>>> > Hmm, I think this is probaly the root cause. Looks like their was a >>>> > file with that name already used. >>>> > >>>> > 13/01/18 13:16:59 ERROR source.SpoolDirectorySource: Uncaught >>>> > exception in Runnable >>>> > java.lang.IllegalStateException: File name has been re-used with >>>> > different files. Spooling assumption violated for >>>> > >>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118100000-20130118100100.hs009.ssp.COMPLETED >>>> > at >>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:272) >>>> > at >>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185) >>>> > at >>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>> > at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>> > at >>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>> > at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>> > at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>> > at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>> > at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>> > at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>> > at java.lang.Thread.run(Thread.java:662) >>>> > >>>> > On Thu, Jan 17, 2013 at 9:22 PM, Henry Ma <[email protected]> >>>> wrote: >>>> >> attached is the log file. >>>> >> >>>> >> the content of conf file: >>>> >> # Name the components on this agent >>>> >> a1.sources = r1 >>>> >> a1.sinks = k1 >>>> >> a1.channels = c1 >>>> >> >>>> >> # Describe/configure the source >>>> >> a1.sources.r1.type = spooldir >>>> >> a1.sources.r1.spoolDir = /disk2/mahy/FLUME_TEST/source >>>> >> a1.sources.r1.channels = c1 >>>> >> >>>> >> # Describe the sink >>>> >> a1.sinks.k1.type = file_roll >>>> >> a1.sinks.k1.sink.directory = /disk2/mahy/FLUME_TEST/sink >>>> >> a1.sinks.k1.sink.rollInterval = 0 >>>> >> >>>> >> # Use a channel which buffers events in memory >>>> >> a1.channels.c1.type = memory >>>> >> a1.channels.c1.capacity = 99999 >>>> >> #a1.channels.c1. = /disk2/mahy/FLUME_TEST/check >>>> >> #a1.channels.c1.dataDirs = /disk2/mahy/FLUME_TEST/channel-data >>>> >> >>>> >> # Bind the source and sink to the channel >>>> >> a1.sources.r1.channels = c1 >>>> >> a1.sinks.k1.channel = c1 >>>> >> >>>> >> >>>> >> On Fri, Jan 18, 2013 at 12:39 PM, Brock Noland <[email protected]> >>>> wrote: >>>> >>> >>>> >>> Hi, >>>> >>> >>>> >>> Would you mind turning logging to debug and then posting your full >>>> >>> log/config? >>>> >>> >>>> >>> Brock >>>> >>> >>>> >>> On Thu, Jan 17, 2013 at 8:24 PM, Henry Ma <[email protected]> >>>> wrote: >>>> >>> > Hi, >>>> >>> > >>>> >>> > When using Spooling Directory Source in Flume NG 1.3.1, this >>>> exception >>>> >>> > happens: >>>> >>> > >>>> >>> > 13/01/18 11:37:09 ERROR source.SpoolDirectorySource: Uncaught >>>> exception >>>> >>> > in >>>> >>> > Runnable >>>> >>> > java.io.IOException: Stream closed >>>> >>> > at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>> >>> > at >>>> >>> > >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>> >>> > at >>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>> >>> > at >>>> >>> > >>>> >>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>> >>> > at java.lang.Thread.run(Thread.java:662) >>>> >>> > >>>> >>> > It usually happened when dropping some new files into the >>>> spooling dir, >>>> >>> > and >>>> >>> > stop collecting file. Does someone know the reason and how to >>>> avoid it? >>>> >>> > >>>> >>> > Thanks very much! >>>> >>> > -- >>>> >>> > Best Regards, >>>> >>> > Henry Ma >>>> >>> >>>> >>> >>>> >>> >>>> >>> -- >>>> >>> Apache MRUnit - Unit testing MapReduce - >>>> >>> http://incubator.apache.org/mrunit/ >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> -- >>>> >> Best Regards, >>>> >> Henry Ma >>>> > >>>> > >>>> > >>>> > -- >>>> > Apache MRUnit - Unit testing MapReduce - >>>> http://incubator.apache.org/mrunit/ >>>> >>> >>> >>> >>> -- >>> Henry Ma >>> >> >> > > > -- > Henry Ma >
