Can you try setting this config param for your HDFS Sink: hdfs.useLocalTimeStamp = true
This should insert the timestamp at the sink into the event (this may not be what you want - but this will get rid of the event from the channel). Thanks, Hari On Wednesday, August 7, 2013 at 7:14 AM, Jonathan Cooper-Ellis wrote: > You can use a Static Interceptor before the RegexExtractor to add a timestamp > of zero to the header, which can then be overwritten by the proper timestamp > (if it exists). It also should sink misses into an obvious 'miss' directory. > > > On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <[email protected] > (mailto:[email protected])> wrote: > > After some reading in the docs I think the existing fail-over behavior > > can't be used to solve the 'poison' message problem as it put the 'failed' > > sink in a 'cooldown' period. > > As the problem is in the message and not the sink, it means that after a > > poison message had arrived, the HDFS sink will 'fail' and thus next X > > messages will go to the failover sink. > > My only solution for now is to avoid my current problem and hope that I > > won't have any other problematic messages, I'll be glad to have a less > > fragile solution. > > > > Many thanks! > > Other than that, Flume looks like a great tool :-) > > > > Anat > > > > > > On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <[email protected] > > (mailto:[email protected])> wrote: > > > I think using a fail-over processor is a very good idea, I think I'll use > > > it as an immediate solution. > > > For the long run, I would like to see a general solution (not specific to > > > file channel, in my case it is an HDFS channel), so the suggestion to add > > > 'Poison Message' sink to the sink processor sound good. > > > > > > Just FYI, my problem is that a log file going through my source did not > > > have (in all rows) the structure I expected. > > > > > > Since I used regexp extractor to put timestamp, the 'bad' row didn't > > > match the regexp and the timestamp was not set, then the HDFS sink throws > > > NPE on that: > > > 01 Aug 2013 09:36:24,259 ERROR > > > [SinkRunner-PollingRunner-DefaultSinkProcessor] > > > (org.apache.flume.sink.hdfs.HDFSEventSink.process:422) - process failed > > > java.lang.NullPointerException: Expected timestamp in the Flume event > > > headers, but it was null > > > at > > > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) > > > at > > > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) > > > at > > > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) > > > at > > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) > > > at > > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > > at > > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > > at java.lang.Thread.run(Thread.java:722) > > > 01 Aug 2013 09:36:24,262 ERROR > > > [SinkRunner-PollingRunner-DefaultSinkProcessor] > > > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > > > event. Exception follows. > > > org.apache.flume.EventDeliveryException: java.lang.NullPointerException: > > > Expected timestamp in the Flume event headers, but it was null > > > at > > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426) > > > at > > > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > > at > > > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > > at java.lang.Thread.run(Thread.java:722) > > > Caused by: java.lang.NullPointerException: Expected timestamp in the > > > Flume event headers, but it was null > > > at > > > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) > > > at > > > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) > > > at > > > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) > > > at > > > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) > > > ... 3 more > > > > > > > > > I fixed my regexp now, still, I can never be sure all the log files the > > > system creates will be perfect without any bad lines. > > > > > > > > > On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <[email protected] > > > (mailto:[email protected])> wrote: > > > > Just some more thoughts. It could be even easier: > > > > > > > > The whole set up might be even easier; for the failover sink processor, > > > > you have those settings "max attempts" and "time between attempts" and > > > > it will just try one event X times before it gives up and sends it to > > > > the next sink. The time between events could even backoff if needed. > > > > > > > > The advantage of this is that it preserves the ordering of events, > > > > something which gets completely broken in the previous scenario. > > > > > > > > - Connor > > > > > > > > > > > > On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <[email protected] > > > > (mailto:[email protected])> wrote: > > > > > As another option to solve the problem of having a bad event in a > > > > > channel: using a fail-over sink processor, log all bad events to a > > > > > local file. And to be extra cautious, add a third failover of a null > > > > > sink. This will mean that events will always flow through your > > > > > channel. The file sink should almost never fail, so you shouldn't be > > > > > losing events in the process. And then you can re-process everything > > > > > in the file if you still want those events for something. > > > > > > > > > > For the system of having Flume detect bad events, I think > > > > > implementing something like above is better than discarding events > > > > > that fail X times. For instance, if you have an Avro sink -> Avro > > > > > source, and you're restarting your source, Flume would end up > > > > > discarding events unnecessarily. Instead, how about implementing the > > > > > above system and then go a step further: Flume will attempt to > > > > > re-send the bad events itself. And then if a bad event isn't able to > > > > > be sent after X attempts, it is can be discarded. > > > > > > > > > > I envision this system as an extension to the current File Channel; > > > > > when an event fails, it is written to a secondary File Channel from > > > > > which events can be pulled when the main channel isn't in use. It > > > > > would add headers like "lastAttempt" and "numberOfAttempts" to > > > > > events. Then it can be configurable for a "min time between attempts" > > > > > and "maximum attempts." When an event fails the second time, those > > > > > headers are updated and it goes back into the fail-channel. If it > > > > > comes out of the fail-channel but the lastAttempt is too recent, it > > > > > goes back in. If it fails more times than the maximum, it is written > > > > > to a final location (perhaps its just sent to another sink; maybe > > > > > this would have to be in a sink processor). Assuming all of those > > > > > steps are error-free, then all messages are preserved, and the > > > > > badly-formatted eventually get stored somewhere else. (This system > > > > > could be hacked together with current code - fail over sink processor > > > > > -> avro sink -> avro source on same instance, but t hat's a little too hacky). > > > > > > > > > > Just some thoughts. > > > > > > > > > > - Connor > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <[email protected] > > > > > (mailto:[email protected])> wrote: > > > > > > This sounds like a critical problem that can cause pipelines to > > > > > > block permanently. If you find yourself in this situation, a > > > > > > possible work around would be to decommission the channel, remove > > > > > > its data and route the flow with a new empty channel. If you have > > > > > > the ability to identify which component is causing the problem and > > > > > > see if you can remove it temporarily to let the problem events pass > > > > > > through another peer component. > > > > > > > > > > > > I have also created FLUME-2140 [1] which will eventually allow the > > > > > > pipelines to identify and divert such bad events. If you have any > > > > > > logs, data, configurations that can be shared and will help provide > > > > > > more details for this problem, it will be great if you could attach > > > > > > them to this jira and provide your comments. > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLUME-2140 > > > > > > > > > > > > Regards, > > > > > > Arvind Prabhakar > > > > > > > > > > > > On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez > > > > > > <[email protected] > > > > > > (mailto:[email protected])> wrote: > > > > > > > There's no way to deal with a bad event once it's in the channel, > > > > > > > but you can mitigate future issues by having a timestamp > > > > > > > interceptor bound to the source feeding the channel. There is a > > > > > > > parameter 'preserve existing' that will only add the header if it > > > > > > > doesn't exist. If you don't want to have 'bad' time data in there > > > > > > > you could try a static interceptor with a specific past date so > > > > > > > that corrupt events fall into a deterministic path in HDFS. > > > > > > > > > > > > > > I use this technique to prevent stuck events for both timestamp > > > > > > > headers as well as some of our own custom headers we use for > > > > > > > tokenized paths. The static interceptor will insert an arbitrary > > > > > > > header if it doesn't exist so I have a couple that put in the > > > > > > > value 'Unknown' so that I can still send the events through the > > > > > > > HDFS sink but I can also find them later if need be. > > > > > > > > > > > > > > hope that helps, > > > > > > > Paul Chavez > > > > > > > > > > > > > > From: Roshan Naik [mailto:[email protected]] > > > > > > > Sent: Thursday, August 01, 2013 10:27 AM > > > > > > > To: [email protected] (mailto:[email protected]) > > > > > > > Subject: Re: Problem Events > > > > > > > > > > > > > > some questions: > > > > > > > - why is the sink unable to consume the event ? > > > > > > > - how would you like to identify such an event ? by examining its > > > > > > > content ? or by the fact that its ping-pong-ing between channel > > > > > > > and sink ? > > > > > > > - what would you prefer to do with such an event ? merely drop it > > > > > > > ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson > > > > > > > <[email protected] (mailto:[email protected])> wrote: > > > > > > > > To my knowledge (which is admittedly limited), there is no way > > > > > > > > to deal with these in a way that will make your day. I'm happy > > > > > > > > if someone can say otherwise. > > > > > > > > > > > > > > > > This is very similar to a problem I had a week or two ago. I > > > > > > > > fixed it by restarting Flume with debugging on, connecting to > > > > > > > > it with the debugger, and finding the message in the sink. > > > > > > > > Discover a bug in the sink. Downloaded Flume, fixed bug, > > > > > > > > recompiled, installed custom version, etc. > > > > > > > > > > > > > > > > I agree that this is not a practical solution, and I still > > > > > > > > believe that Flume needs some sort of "sink of last resort" > > > > > > > > option or something, like JMS implementations. > > > > > > > > > > > > > > > > -- Jeremy > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <[email protected] > > > > > > > > (mailto:[email protected])> wrote: > > > > > > > > > The message is already in the channel. > > > > > > > > > Is there a way to write an interceptor to work after the > > > > > > > > > channel? or before the sink? > > > > > > > > > > > > > > > > > > The only thing I found is to stop everything and delete the > > > > > > > > > channel files, but I won't be able to use this approach in > > > > > > > > > production :-( > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 1, 2013 at 11:13 AM, Ashish > > > > > > > > > <[email protected] (mailto:[email protected])> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon > > > > > > > > > > <[email protected] (mailto:[email protected])> wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > I'm having the same problem with HDFS sink. > > > > > > > > > > > > > > > > > > > > > > A 'poison' message which doesn't have timestamp header in > > > > > > > > > > > it as the sink expects. > > > > > > > > > > > This causes a NPE which ends in returning the message to > > > > > > > > > > > the channel , over and over again. > > > > > > > > > > > > > > > > > > > > > > Is my only option to re-write the HDFS sink? > > > > > > > > > > > Isn't there any way to intercept in the sink work? > > > > > > > > > > > > > > > > > > > > You can write a custom interceptor and remove/modify the > > > > > > > > > > poison message. > > > > > > > > > > > > > > > > > > > > Interceptors are called before message makes it way into > > > > > > > > > > the channel. > > > > > > > > > > > > > > > > > > > > http://flume.apache.org/FlumeUserGuide.html#flume-interceptors > > > > > > > > > > > > > > > > > > > > I wrote a blog about it a while back > > > > > > > > > > http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > Anat > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar > > > > > > > > > > > <[email protected] (mailto:[email protected])> wrote: > > > > > > > > > > > > Sounds like a bug in ElasticSearch sink to me. Do you > > > > > > > > > > > > mind filing a Jira to track this? Sample data to cause > > > > > > > > > > > > this would be even better. > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > Arvind Prabhakar > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson > > > > > > > > > > > > <[email protected] > > > > > > > > > > > > (mailto:[email protected])> wrote: > > > > > > > > > > > > > This was using the provided ElasticSearch sink. The > > > > > > > > > > > > > logs were not helpful. I ran it through with the > > > > > > > > > > > > > debugger and found the source of the problem. > > > > > > > > > > > > > > > > > > > > > > > > > > ContentBuilderUtil uses a very "aggressive" method to > > > > > > > > > > > > > determine if the content is JSON; if it contains a > > > > > > > > > > > > > "{" anywhere in it, it's considered JSON. My body > > > > > > > > > > > > > contained that but wasn't JSON, causing the JSON > > > > > > > > > > > > > parser to throw a CharConversionException from > > > > > > > > > > > > > addComplexField(...) (but not the expected > > > > > > > > > > > > > JSONException). We've changed addComplexField(...) > > > > > > > > > > > > > to catch different types of exceptions and fall back > > > > > > > > > > > > > to treating it as a simple field. We'll probably > > > > > > > > > > > > > submit a patch for this soon. > > > > > > > > > > > > > > > > > > > > > > > > > > I'm reasonably happy with this, but I still think > > > > > > > > > > > > > that in the bigger picture there should be some sort > > > > > > > > > > > > > of mechanism to automatically detect and toss / skip > > > > > > > > > > > > > / flag problematic events without them plugging up > > > > > > > > > > > > > the flow. > > > > > > > > > > > > > > > > > > > > > > > > > > -- Jeremy > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar > > > > > > > > > > > > > <[email protected] (mailto:[email protected])> wrote: > > > > > > > > > > > > > > Jeremy, would it be possible for you to show us > > > > > > > > > > > > > > logs for the part where the sink fails to remove an > > > > > > > > > > > > > > event from the channel? I am assuming this is a > > > > > > > > > > > > > > standard sink that Flume provides and not a custom > > > > > > > > > > > > > > one. > > > > > > > > > > > > > > > > > > > > > > > > > > > > The reason I ask is because sinks do not introspect > > > > > > > > > > > > > > the event, and hence there is no reason why it will > > > > > > > > > > > > > > fail during the event's removal. It is more likely > > > > > > > > > > > > > > that there is a problem within the channel in that > > > > > > > > > > > > > > it cannot dereference the event correctly. Looking > > > > > > > > > > > > > > at the logs will help us identify the root cause > > > > > > > > > > > > > > for what you are experiencing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Arvind Prabhakar > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson > > > > > > > > > > > > > > <[email protected] > > > > > > > > > > > > > > (mailto:[email protected])> wrote: > > > > > > > > > > > > > > > Both reasonable suggestions. What would a custom > > > > > > > > > > > > > > > sink look like in this case, and how would I only > > > > > > > > > > > > > > > eliminate the problem events since I don't know > > > > > > > > > > > > > > > what they are until they are attempted by the > > > > > > > > > > > > > > > "real" sink? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My philosophical concern (in general) is that > > > > > > > > > > > > > > > we're taking the approach of exhaustively finding > > > > > > > > > > > > > > > and eliminating possible failure cases. It's not > > > > > > > > > > > > > > > possible to eliminate every single failure case, > > > > > > > > > > > > > > > so shouldn't there be a method of last resort to > > > > > > > > > > > > > > > eliminate problem events from the channel? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- Jeremy > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan > > > > > > > > > > > > > > > <[email protected] > > > > > > > > > > > > > > > (mailto:[email protected])> wrote: > > > > > > > > > > > > > > > > Or you could write a custom sink that removes > > > > > > > > > > > > > > > > this event (more work of course) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Hari > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wednesday, July 24, 2013 at 3:36 PM, Roshan > > > > > > > > > > > > > > > > Naik wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > if you have a way to identify such events.. > > > > > > > > > > > > > > > > > you may be able to use the Regex interceptor > > > > > > > > > > > > > > > > > to toss them out before they get into the > > > > > > > > > > > > > > > > > channel. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 2:52 PM, Jeremy > > > > > > > > > > > > > > > > > Karlson <[email protected] > > > > > > > > > > > > > > > > > (mailto:[email protected])> wrote: > > > > > > > > > > > > > > > > > > Hi everyone. My Flume adventures continue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm in a situation now where I have a > > > > > > > > > > > > > > > > > > channel that's filling because a stubborn > > > > > > > > > > > > > > > > > > message is stuck. The sink won't accept it > > > > > > > > > > > > > > > > > > (for whatever reason; I can go into detail > > > > > > > > > > > > > > > > > > but that's not my point here). This just > > > > > > > > > > > > > > > > > > blocks up the channel entirely, because it > > > > > > > > > > > > > > > > > > goes back into the channel when the sink > > > > > > > > > > > > > > > > > > refuses. Obviously, this isn't ideal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm wondering what mechanisms, if any, > > > > > > > > > > > > > > > > > > Flume has to deal with these situations. > > > > > > > > > > > > > > > > > > Things that come to mind might be: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Ditch the event after n attempts. > > > > > > > > > > > > > > > > > > 2. After n attempts, send the event to a > > > > > > > > > > > > > > > > > > "problem area" (maybe a different source / > > > > > > > > > > > > > > > > > > sink / channel?) that someone can look at > > > > > > > > > > > > > > > > > > later. > > > > > > > > > > > > > > > > > > 3. Some sort of mechanism that allows > > > > > > > > > > > > > > > > > > operators to manually kill these messages. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm open to suggestions on alternatives as > > > > > > > > > > > > > > > > > > well. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- Jeremy > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > thanks > > > > > > > > > > ashish > > > > > > > > > > > > > > > > > > > > Blog: http://www.ashishpaliwal.com/blog > > > > > > > > > > My Photo Galleries: http://www.pbase.com/ashishpaliwal > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
