We have a advertisement system, which owns hundreds of impr-servers, each of them runs service such as resin, and generates log files every seconds. We want to in time collect these log files from each server to a central storage system such as MooseFS for real time analysis, then archive them to HDFS by hour.
What we need is to deploy Flume to collect remote log files from a alterable server-and-path list (the list can be ofen changed, to add or remove collecting location). We hope every log file may be collected almost real time. On Fri, Jan 18, 2013 at 3:45 PM, Mike Percy <[email protected]> wrote: > Can you provide more detail about what kinds of services? > > If you roll the logs every 5 minutes or so then you can configure the > spooling source to pick them up once they are rolled by either rolling them > into a directory for immutable files or using the trunk version of the > spooling file source to specify a filter to ignore files that don't match a > "rolled" pattern. > > You could also use exec source with "tail -F" but that is much more > unreliable than the spooling file source. > > Regards, > Mike > > > On Thu, Jan 17, 2013 at 10:23 PM, Henry Ma <[email protected]>wrote: > >> OK, thank you very much, now I know why the problem occurs. >> >> I am a new comer of Flume. Here is my scenario: using Flume to collecting >> from hundreds of directories from dozens of servers to a central storage. >> It seems that spooling directory source may not be the best choice. Can >> someone give me some advice about how to design the architecture? Which >> type of source and sink can fit? >> >> Thanks! >> >> >> On Fri, Jan 18, 2013 at 2:05 PM, Mike Percy <[email protected]> wrote: >> >>> Hi Henry, >>> The files must be immutable before putting them into the spooling >>> directory. So if you copy them from a different file system then you can >>> run into this issue. The right way to do it is to copy them to the same >>> file system and then atomically move them into the spooling directory. >>> >>> Regards, >>> Mike >>> >>> >>> On Thu, Jan 17, 2013 at 9:59 PM, Henry Ma <[email protected]>wrote: >>> >>>> Thank you very much! I clean all the related dir and restart again. I >>>> keep the source spooling dir empty, then start Flume, and then put some >>>> file into the spooling dir. But this time a new error occured: >>>> >>>> 13/01/18 13:44:24 INFO avro.SpoolingFileLineReader: Preparing to move >>>> file >>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp >>>> to /disk2/mahy/FLUME_TEST/ >>>> source/sspstat.log.20130118112700-20130118112800.hs016.ssp.COMPLETED >>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught exception >>>> in Runnable >>>> java.lang.IllegalStateException: File has changed size since being >>>> read: >>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp >>>> at >>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:241) >>>> at >>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185) >>>> at >>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>> at >>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>> at >>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>> at java.lang.Thread.run(Thread.java:662) >>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught exception >>>> in Runnable >>>> java.io.IOException: Stream closed >>>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>> at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>> at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>> at >>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180) >>>> at >>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>> at >>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>> at >>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>> at java.lang.Thread.run(Thread.java:662) >>>> 13/01/18 13:44:25 ERROR source.SpoolDirectorySource: Uncaught exception >>>> in Runnable >>>> java.io.IOException: Stream closed >>>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>> >>>> >>>> I think it is a typical scenario: Flume is watching some dirs and >>>> collecting new arriving files. I don't know why the exception " File has >>>> changed size since being read" was throwed and how to avoid it. Can you >>>> give some advice and guide? Thanks! >>>> >>>> >>>> On Fri, Jan 18, 2013 at 1:48 PM, Patrick Wendell <[email protected]>wrote: >>>> >>>>> Hey Henry, >>>>> >>>>> The Spooling source assumes that each file is uniquely named. If it >>>>> sees that new file name has arrived that it already processed (and has >>>>> rolled over to a COMPLETED file), it throws an error and shuts down. >>>>> This is to try and prevent sending duplicate data downstream. >>>>> >>>>> Probably the best idea is to clear out the COMPLETED file (and the >>>>> original file, if they are indeed the same one) and restart. >>>>> >>>>> - Patrick >>>>> >>>>> On Thu, Jan 17, 2013 at 9:31 PM, Brock Noland <[email protected]> >>>>> wrote: >>>>> > Hmm, I think this is probaly the root cause. Looks like their was a >>>>> > file with that name already used. >>>>> > >>>>> > 13/01/18 13:16:59 ERROR source.SpoolDirectorySource: Uncaught >>>>> > exception in Runnable >>>>> > java.lang.IllegalStateException: File name has been re-used with >>>>> > different files. Spooling assumption violated for >>>>> > >>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118100000-20130118100100.hs009.ssp.COMPLETED >>>>> > at >>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:272) >>>>> > at >>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185) >>>>> > at >>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>>> > at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>> > at >>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>>> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>> > at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>>> > at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>> > at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>>> > at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>> > at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>> > at java.lang.Thread.run(Thread.java:662) >>>>> > >>>>> > On Thu, Jan 17, 2013 at 9:22 PM, Henry Ma <[email protected]> >>>>> wrote: >>>>> >> attached is the log file. >>>>> >> >>>>> >> the content of conf file: >>>>> >> # Name the components on this agent >>>>> >> a1.sources = r1 >>>>> >> a1.sinks = k1 >>>>> >> a1.channels = c1 >>>>> >> >>>>> >> # Describe/configure the source >>>>> >> a1.sources.r1.type = spooldir >>>>> >> a1.sources.r1.spoolDir = /disk2/mahy/FLUME_TEST/source >>>>> >> a1.sources.r1.channels = c1 >>>>> >> >>>>> >> # Describe the sink >>>>> >> a1.sinks.k1.type = file_roll >>>>> >> a1.sinks.k1.sink.directory = /disk2/mahy/FLUME_TEST/sink >>>>> >> a1.sinks.k1.sink.rollInterval = 0 >>>>> >> >>>>> >> # Use a channel which buffers events in memory >>>>> >> a1.channels.c1.type = memory >>>>> >> a1.channels.c1.capacity = 99999 >>>>> >> #a1.channels.c1. = /disk2/mahy/FLUME_TEST/check >>>>> >> #a1.channels.c1.dataDirs = /disk2/mahy/FLUME_TEST/channel-data >>>>> >> >>>>> >> # Bind the source and sink to the channel >>>>> >> a1.sources.r1.channels = c1 >>>>> >> a1.sinks.k1.channel = c1 >>>>> >> >>>>> >> >>>>> >> On Fri, Jan 18, 2013 at 12:39 PM, Brock Noland <[email protected]> >>>>> wrote: >>>>> >>> >>>>> >>> Hi, >>>>> >>> >>>>> >>> Would you mind turning logging to debug and then posting your full >>>>> >>> log/config? >>>>> >>> >>>>> >>> Brock >>>>> >>> >>>>> >>> On Thu, Jan 17, 2013 at 8:24 PM, Henry Ma <[email protected]> >>>>> wrote: >>>>> >>> > Hi, >>>>> >>> > >>>>> >>> > When using Spooling Directory Source in Flume NG 1.3.1, this >>>>> exception >>>>> >>> > happens: >>>>> >>> > >>>>> >>> > 13/01/18 11:37:09 ERROR source.SpoolDirectorySource: Uncaught >>>>> exception >>>>> >>> > in >>>>> >>> > Runnable >>>>> >>> > java.io.IOException: Stream closed >>>>> >>> > at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>>> >>> > at >>>>> >>> > >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>>> >>> > at >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>> >>> > at >>>>> >>> > >>>>> >>> > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>> >>> > at java.lang.Thread.run(Thread.java:662) >>>>> >>> > >>>>> >>> > It usually happened when dropping some new files into the >>>>> spooling dir, >>>>> >>> > and >>>>> >>> > stop collecting file. Does someone know the reason and how to >>>>> avoid it? >>>>> >>> > >>>>> >>> > Thanks very much! >>>>> >>> > -- >>>>> >>> > Best Regards, >>>>> >>> > Henry Ma >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> -- >>>>> >>> Apache MRUnit - Unit testing MapReduce - >>>>> >>> http://incubator.apache.org/mrunit/ >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> -- >>>>> >> Best Regards, >>>>> >> Henry Ma >>>>> > >>>>> > >>>>> > >>>>> > -- >>>>> > Apache MRUnit - Unit testing MapReduce - >>>>> http://incubator.apache.org/mrunit/ >>>>> >>>> >>>> >>>> >>>> -- >>>> Henry Ma >>>> >>> >>> >> >> >> -- >> Henry Ma >> > > -- Henry Ma
