Thank you very much, Connor!! It is really HELPFUL.
On Fri, Jan 18, 2013 at 5:13 PM, Connor Woodson <[email protected]>wrote: > The Spooling Directory Source is best used for sending old data / backups > through Flume, as opposed to trying to use it for realtime data (due to, as > you discovered, you aren't supposed to write directly to a file in that > directory but rather place closed files in there). You could implement > what Mike mentioned above about rolling the logs into the spooling > directory, but there are other options. > > If you are looking to pull data in real time, the Exec > Source<http://flume.apache.org/FlumeUserGuide.html#exec-source>mentioned > above does work. The one downside with this is that this source > is not the most reliable, as is mentioned in the red box in that link, and > you will have to monitor it to make sure it hasn't crashed. However, other > than the Spooling Directory source and any custom source you write, this is > the only other pulling source. > > But depending on how your system is set up, you could set up a system for > pushing your logs into Flume. Here are some options: > > If the log files you want to capture use Log4J, then there is a Log4JAppender > <http://flume.apache.org/FlumeUserGuide.html#log4j-appender>which will > send events directly to Flume. The benefit to this is that you let Flume > take control of the events right as they are generated; they are sent > through Avro to your specified host/ip where you will have a Flume agent > with an Avro > Source<http://flume.apache.org/FlumeUserGuide.html#flume-sources>running. > > Another alternative to the above if you don't use Log4J but you do have > direct control over the application is to use the Embedded Flume > Agent<https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst#embedded-agent>. > This is even more powerful than the log4j appender as you have more control > over how it works and you are able to use the Flume channels with it. This > would end up pushing events via Avro to your Flume agent to then > collect/process/store. > > There are a variety of network methods that can communicate with Flume. > Flume has support for listening on a specified port with the Netcat > Source<http://flume.apache.org/FlumeUserGuide.html#netcat-source>, > getting events via HTTP > Post<http://flume.apache.org/FlumeUserGuide.html#http-source>messages, and if > your application uses Syslog that's > supported <http://flume.apache.org/FlumeUserGuide.html#syslog-sources> as > well. > > In summation, if you need to set up a pulling system you will need to > place a Flume agent on each of your servers and have it use a Spooling > Directory or Exec source; or if your system is configurable enough you will > be able to modify it in various possible ways to push the logs to Flume. > > I hope some of that was helpful, > > - Connor > > > On Fri, Jan 18, 2013 at 12:18 AM, Henry Ma <[email protected]>wrote: > >> We have an advertisement system, which owns hundreds of servers running >> service such as resin/nginx, and each of them generates log files to a >> local location every seconds. What we need is to collect all the log files >> in time to a central storage such as MooseFS for real-time analysis, and >> then archive them to HDFS by hour. >> >> We want to deploy Flume to collect log files as soon as they are >> generated from nearly one hundred servers (the server list may be added or >> removed at any time) to a central location, and then archive to HDFS each >> hour. >> >> By now the log files cannot be pushed to any collecting system. We want >> to the collecting system can PULL all of them remotely. >> >> Can you give me some guide? Thanks! >> >> >> On Fri, Jan 18, 2013 at 3:45 PM, Mike Percy <[email protected]> wrote: >> >>> Can you provide more detail about what kinds of services? >>> >>> If you roll the logs every 5 minutes or so then you can configure the >>> spooling source to pick them up once they are rolled by either rolling them >>> into a directory for immutable files or using the trunk version of the >>> spooling file source to specify a filter to ignore files that don't match a >>> "rolled" pattern. >>> >>> You could also use exec source with "tail -F" but that is much more >>> unreliable than the spooling file source. >>> >>> Regards, >>> Mike >>> >>> >>> On Thu, Jan 17, 2013 at 10:23 PM, Henry Ma <[email protected]>wrote: >>> >>>> OK, thank you very much, now I know why the problem occurs. >>>> >>>> I am a new comer of Flume. Here is my scenario: using Flume to >>>> collecting from hundreds of directories from dozens of servers to a central >>>> storage. It seems that spooling directory source may not be the best >>>> choice. Can someone give me some advice about how to design the >>>> architecture? Which type of source and sink can fit? >>>> >>>> Thanks! >>>> >>>> >>>> On Fri, Jan 18, 2013 at 2:05 PM, Mike Percy <[email protected]> wrote: >>>> >>>>> Hi Henry, >>>>> The files must be immutable before putting them into the spooling >>>>> directory. So if you copy them from a different file system then you can >>>>> run into this issue. The right way to do it is to copy them to the same >>>>> file system and then atomically move them into the spooling directory. >>>>> >>>>> Regards, >>>>> Mike >>>>> >>>>> >>>>> On Thu, Jan 17, 2013 at 9:59 PM, Henry Ma <[email protected]>wrote: >>>>> >>>>>> Thank you very much! I clean all the related dir and restart again. I >>>>>> keep the source spooling dir empty, then start Flume, and then put some >>>>>> file into the spooling dir. But this time a new error occured: >>>>>> >>>>>> 13/01/18 13:44:24 INFO avro.SpoolingFileLineReader: Preparing to move >>>>>> file >>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp >>>>>> to /disk2/mahy/FLUME_TEST/ >>>>>> source/sspstat.log.20130118112700-20130118112800.hs016.ssp.COMPLETED >>>>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught >>>>>> exception in Runnable >>>>>> java.lang.IllegalStateException: File has changed size since being >>>>>> read: >>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp >>>>>> at >>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:241) >>>>>> at >>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185) >>>>>> at >>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>>> at >>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>>>> at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>>> at java.lang.Thread.run(Thread.java:662) >>>>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught >>>>>> exception in Runnable >>>>>> java.io.IOException: Stream closed >>>>>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>>> at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>>>> at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>>>> at >>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180) >>>>>> at >>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>>> at >>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>>>> at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>>> at java.lang.Thread.run(Thread.java:662) >>>>>> 13/01/18 13:44:25 ERROR source.SpoolDirectorySource: Uncaught >>>>>> exception in Runnable >>>>>> java.io.IOException: Stream closed >>>>>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>>> >>>>>> >>>>>> I think it is a typical scenario: Flume is watching some dirs and >>>>>> collecting new arriving files. I don't know why the exception " File has >>>>>> changed size since being read" was throwed and how to avoid it. Can you >>>>>> give some advice and guide? Thanks! >>>>>> >>>>>> >>>>>> On Fri, Jan 18, 2013 at 1:48 PM, Patrick Wendell >>>>>> <[email protected]>wrote: >>>>>> >>>>>>> Hey Henry, >>>>>>> >>>>>>> The Spooling source assumes that each file is uniquely named. If it >>>>>>> sees that new file name has arrived that it already processed (and >>>>>>> has >>>>>>> rolled over to a COMPLETED file), it throws an error and shuts down. >>>>>>> This is to try and prevent sending duplicate data downstream. >>>>>>> >>>>>>> Probably the best idea is to clear out the COMPLETED file (and the >>>>>>> original file, if they are indeed the same one) and restart. >>>>>>> >>>>>>> - Patrick >>>>>>> >>>>>>> On Thu, Jan 17, 2013 at 9:31 PM, Brock Noland <[email protected]> >>>>>>> wrote: >>>>>>> > Hmm, I think this is probaly the root cause. Looks like their was a >>>>>>> > file with that name already used. >>>>>>> > >>>>>>> > 13/01/18 13:16:59 ERROR source.SpoolDirectorySource: Uncaught >>>>>>> > exception in Runnable >>>>>>> > java.lang.IllegalStateException: File name has been re-used with >>>>>>> > different files. Spooling assumption violated for >>>>>>> > >>>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118100000-20130118100100.hs009.ssp.COMPLETED >>>>>>> > at >>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:272) >>>>>>> > at >>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185) >>>>>>> > at >>>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>>>>> > at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>>>> > at >>>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>>>>> > at >>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>>>> > at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>>>>> > at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>>>> > at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>>>>> > at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>>>> > at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>>>> > at java.lang.Thread.run(Thread.java:662) >>>>>>> > >>>>>>> > On Thu, Jan 17, 2013 at 9:22 PM, Henry Ma <[email protected]> >>>>>>> wrote: >>>>>>> >> attached is the log file. >>>>>>> >> >>>>>>> >> the content of conf file: >>>>>>> >> # Name the components on this agent >>>>>>> >> a1.sources = r1 >>>>>>> >> a1.sinks = k1 >>>>>>> >> a1.channels = c1 >>>>>>> >> >>>>>>> >> # Describe/configure the source >>>>>>> >> a1.sources.r1.type = spooldir >>>>>>> >> a1.sources.r1.spoolDir = /disk2/mahy/FLUME_TEST/source >>>>>>> >> a1.sources.r1.channels = c1 >>>>>>> >> >>>>>>> >> # Describe the sink >>>>>>> >> a1.sinks.k1.type = file_roll >>>>>>> >> a1.sinks.k1.sink.directory = /disk2/mahy/FLUME_TEST/sink >>>>>>> >> a1.sinks.k1.sink.rollInterval = 0 >>>>>>> >> >>>>>>> >> # Use a channel which buffers events in memory >>>>>>> >> a1.channels.c1.type = memory >>>>>>> >> a1.channels.c1.capacity = 99999 >>>>>>> >> #a1.channels.c1. = /disk2/mahy/FLUME_TEST/check >>>>>>> >> #a1.channels.c1.dataDirs = /disk2/mahy/FLUME_TEST/channel-data >>>>>>> >> >>>>>>> >> # Bind the source and sink to the channel >>>>>>> >> a1.sources.r1.channels = c1 >>>>>>> >> a1.sinks.k1.channel = c1 >>>>>>> >> >>>>>>> >> >>>>>>> >> On Fri, Jan 18, 2013 at 12:39 PM, Brock Noland < >>>>>>> [email protected]> wrote: >>>>>>> >>> >>>>>>> >>> Hi, >>>>>>> >>> >>>>>>> >>> Would you mind turning logging to debug and then posting your >>>>>>> full >>>>>>> >>> log/config? >>>>>>> >>> >>>>>>> >>> Brock >>>>>>> >>> >>>>>>> >>> On Thu, Jan 17, 2013 at 8:24 PM, Henry Ma < >>>>>>> [email protected]> wrote: >>>>>>> >>> > Hi, >>>>>>> >>> > >>>>>>> >>> > When using Spooling Directory Source in Flume NG 1.3.1, this >>>>>>> exception >>>>>>> >>> > happens: >>>>>>> >>> > >>>>>>> >>> > 13/01/18 11:37:09 ERROR source.SpoolDirectorySource: Uncaught >>>>>>> exception >>>>>>> >>> > in >>>>>>> >>> > Runnable >>>>>>> >>> > java.io.IOException: Stream closed >>>>>>> >>> > at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) >>>>>>> >>> > at >>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>>>>>> >>> > at >>>>>>> >>> > >>>>>>> >>> > >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>>>>>> >>> > at java.lang.Thread.run(Thread.java:662) >>>>>>> >>> > >>>>>>> >>> > It usually happened when dropping some new files into the >>>>>>> spooling dir, >>>>>>> >>> > and >>>>>>> >>> > stop collecting file. Does someone know the reason and how to >>>>>>> avoid it? >>>>>>> >>> > >>>>>>> >>> > Thanks very much! >>>>>>> >>> > -- >>>>>>> >>> > Best Regards, >>>>>>> >>> > Henry Ma >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> -- >>>>>>> >>> Apache MRUnit - Unit testing MapReduce - >>>>>>> >>> http://incubator.apache.org/mrunit/ >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> -- >>>>>>> >> Best Regards, >>>>>>> >> Henry Ma >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Apache MRUnit - Unit testing MapReduce - >>>>>>> http://incubator.apache.org/mrunit/ >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Henry Ma >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Henry Ma >>>> >>> >>> >> >> >> -- >> Best Regards, >> 马环宇 >> 网易有道 EAD-Platform >> POPO: [email protected] >> MSN: [email protected] >> MOBILE: 18600601996 >> > > -- Best Regards, 马环宇 网易有道 EAD-Platform POPO: [email protected] MSN: [email protected] MOBILE: 18600601996
