To avoid the cooldown period, set the maxBackoff to 0; the failover sink should have some better logic regarding this, but what will happen will take a little more processing time:
an event fails -> that sink is put on the fail over list -> the event goes to the next sink, succeeds. next event -> failover processor checks fail over list, removes the first sink from that list -> event goes to first sink I am fairly confident that's how it works. - Connor On Wed, Aug 7, 2013 at 10:26 PM, Anat Rozenzon <[email protected]> wrote: > That's what I'll do, add a static timestamp of 1 and let all the 'bad' > messages flow into one directoty. > Thanks > > > On Wed, Aug 7, 2013 at 5:14 PM, Jonathan Cooper-Ellis > <[email protected]>wrote: > >> You can use a Static Interceptor before the RegexExtractor to add a >> timestamp of zero to the header, which can then be overwritten by the >> proper timestamp (if it exists). It also should sink misses into an obvious >> 'miss' directory. >> >> >> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <[email protected]> wrote: >> >>> After some reading in the docs I think the existing fail-over behavior >>> can't be used to solve the 'poison' message problem as it put the 'failed' >>> sink in a 'cooldown' period. >>> As the problem is in the message and not the sink, it means that after a >>> poison message had arrived, the HDFS sink will 'fail' and thus next X >>> messages will go to the failover sink. >>> My only solution for now is to avoid my current problem and hope that I >>> won't have any other problematic messages, I'll be glad to have a less >>> fragile solution. >>> >>> Many thanks! >>> Other than that, Flume looks like a great tool :-) >>> >>> Anat >>> >>> >>> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <[email protected]> wrote: >>> >>>> I think using a fail-over processor is a very good idea, I think I'll >>>> use it as an immediate solution. >>>> For the long run, I would like to see a general solution (not specific >>>> to file channel, in my case it is an HDFS channel), so the suggestion to >>>> add 'Poison Message' sink to the sink processor sound good. >>>> >>>> Just FYI, my problem is that a log file going through my source did not >>>> have (in all rows) the structure I expected. >>>> >>>> Since I used regexp extractor to put timestamp, the 'bad' row didn't >>>> match the regexp and the timestamp was not set, then the HDFS sink throws >>>> NPE on that: >>>> 01 Aug 2013 09:36:24,259 ERROR >>>> [SinkRunner-PollingRunner-DefaultSinkProcessor] >>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422) - process failed >>>> java.lang.NullPointerException: Expected timestamp in the Flume event >>>> headers, but it was null >>>> at >>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) >>>> at >>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) >>>> at >>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) >>>> at >>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) >>>> at >>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>> at >>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>> at java.lang.Thread.run(Thread.java:722) >>>> 01 Aug 2013 09:36:24,262 ERROR >>>> [SinkRunner-PollingRunner-DefaultSinkProcessor] >>>> (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver >>>> event. Exception follows. >>>> org.apache.flume.EventDeliveryException: >>>> java.lang.NullPointerException: Expected timestamp in the Flume event >>>> headers, but it was null >>>> at >>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426) >>>> at >>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >>>> at >>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >>>> at java.lang.Thread.run(Thread.java:722) >>>> Caused by: java.lang.NullPointerException: Expected timestamp in the >>>> Flume event headers, but it was null >>>> at >>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) >>>> at >>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200) >>>> at >>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396) >>>> at >>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356) >>>> ... 3 more >>>> >>>> >>>> I fixed my regexp now, still, I can never be sure all the log files the >>>> system creates will be perfect without any bad lines. >>>> >>>> >>>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson >>>> <[email protected]>wrote: >>>> >>>>> Just some more thoughts. It could be even easier: >>>>> >>>>> The whole set up might be even easier; for the failover sink >>>>> processor, you have those settings "max attempts" and "time between >>>>> attempts" and it will just try one event X times before it gives up and >>>>> sends it to the next sink. The time between events could even backoff if >>>>> needed. >>>>> >>>>> The advantage of this is that it preserves the ordering of events, >>>>> something which gets completely broken in the previous scenario. >>>>> >>>>> - Connor >>>>> >>>>> >>>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <[email protected] >>>>> > wrote: >>>>> >>>>>> As another option to solve the problem of having a bad event in a >>>>>> channel: using a fail-over sink processor, log all bad events to a local >>>>>> file. And to be extra cautious, add a third failover of a null sink. This >>>>>> will mean that events will always flow through your channel. The file >>>>>> sink >>>>>> should almost never fail, so you shouldn't be losing events in the >>>>>> process. >>>>>> And then you can re-process everything in the file if you still want >>>>>> those >>>>>> events for something. >>>>>> >>>>>> For the system of having Flume detect bad events, I think >>>>>> implementing something like above is better than discarding events that >>>>>> fail X times. For instance, if you have an Avro sink -> Avro source, and >>>>>> you're restarting your source, Flume would end up discarding events >>>>>> unnecessarily. Instead, how about implementing the above system and then >>>>>> go >>>>>> a step further: Flume will attempt to re-send the bad events itself. And >>>>>> then if a bad event isn't able to be sent after X attempts, it is can be >>>>>> discarded. >>>>>> >>>>>> I envision this system as an extension to the current File Channel; >>>>>> when an event fails, it is written to a secondary File Channel from which >>>>>> events can be pulled when the main channel isn't in use. It would add >>>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then it can >>>>>> be >>>>>> configurable for a "min time between attempts" and "maximum attempts." >>>>>> When >>>>>> an event fails the second time, those headers are updated and it goes >>>>>> back >>>>>> into the fail-channel. If it comes out of the fail-channel but the >>>>>> lastAttempt is too recent, it goes back in. If it fails more times than >>>>>> the >>>>>> maximum, it is written to a final location (perhaps its just sent to >>>>>> another sink; maybe this would have to be in a sink processor). Assuming >>>>>> all of those steps are error-free, then all messages are preserved, and >>>>>> the >>>>>> badly-formatted eventually get stored somewhere else. (This system could >>>>>> be >>>>>> hacked together with current code - fail over sink processor -> avro sink >>>>>> -> avro source on same instance, but that's a little too hacky). >>>>>> >>>>>> Just some thoughts. >>>>>> >>>>>> - Connor >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar >>>>>> <[email protected]>wrote: >>>>>> >>>>>>> This sounds like a critical problem that can cause pipelines to >>>>>>> block permanently. If you find yourself in this situation, a possible >>>>>>> work >>>>>>> around would be to decommission the channel, remove its data and route >>>>>>> the >>>>>>> flow with a new empty channel. If you have the ability to identify which >>>>>>> component is causing the problem and see if you can remove it >>>>>>> temporarily >>>>>>> to let the problem events pass through another peer component. >>>>>>> >>>>>>> I have also created FLUME-2140 [1] which will eventually allow the >>>>>>> pipelines to identify and divert such bad events. If you have any logs, >>>>>>> data, configurations that can be shared and will help provide more >>>>>>> details >>>>>>> for this problem, it will be great if you could attach them to this jira >>>>>>> and provide your comments. >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140 >>>>>>> >>>>>>> Regards, >>>>>>> Arvind Prabhakar >>>>>>> >>>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> ** >>>>>>>> There's no way to deal with a bad event once it's in the channel, >>>>>>>> but you can mitigate future issues by having a timestamp interceptor >>>>>>>> bound >>>>>>>> to the source feeding the channel. There is a parameter 'preserve >>>>>>>> existing' >>>>>>>> that will only add the header if it doesn't exist. If you don't want to >>>>>>>> have 'bad' time data in there you could try a static interceptor with a >>>>>>>> specific past date so that corrupt events fall into a deterministic >>>>>>>> path in >>>>>>>> HDFS. >>>>>>>> >>>>>>>> I use this technique to prevent stuck events for both timestamp >>>>>>>> headers as well as some of our own custom headers we use for tokenized >>>>>>>> paths. The static interceptor will insert an arbitrary header if it >>>>>>>> doesn't >>>>>>>> exist so I have a couple that put in the value 'Unknown' so that I can >>>>>>>> still send the events through the HDFS sink but I can also find them >>>>>>>> later >>>>>>>> if need be. >>>>>>>> >>>>>>>> hope that helps, >>>>>>>> Paul Chavez >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> *From:* Roshan Naik [mailto:[email protected]] >>>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM >>>>>>>> *To:* [email protected] >>>>>>>> *Subject:* Re: Problem Events >>>>>>>> >>>>>>>> some questions: >>>>>>>> - why is the sink unable to consume the event ? >>>>>>>> - how would you like to identify such an event ? by examining its >>>>>>>> content ? or by the fact that its ping-pong-ing between channel and >>>>>>>> sink ? >>>>>>>> - what would you prefer to do with such an event ? merely drop it ? >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> To my knowledge (which is admittedly limited), there is no way >>>>>>>>> to deal with these in a way that will make your day. I'm happy if >>>>>>>>> someone >>>>>>>>> can say otherwise. >>>>>>>>> >>>>>>>>> This is very similar to a problem I had a week or two ago. I >>>>>>>>> fixed it by restarting Flume with debugging on, connecting to it with >>>>>>>>> the >>>>>>>>> debugger, and finding the message in the sink. Discover a bug in the >>>>>>>>> sink. >>>>>>>>> Downloaded Flume, fixed bug, recompiled, installed custom version, >>>>>>>>> etc. >>>>>>>>> >>>>>>>>> I agree that this is not a practical solution, and I still believe >>>>>>>>> that Flume needs some sort of "sink of last resort" option or >>>>>>>>> something, >>>>>>>>> like JMS implementations. >>>>>>>>> >>>>>>>>> -- Jeremy >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <[email protected]>wrote: >>>>>>>>> >>>>>>>>>> The message is already in the channel. >>>>>>>>>> Is there a way to write an interceptor to work after the channel? >>>>>>>>>> or before the sink? >>>>>>>>>> >>>>>>>>>> The only thing I found is to stop everything and delete the >>>>>>>>>> channel files, but I won't be able to use this approach in >>>>>>>>>> production :-( >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish >>>>>>>>>> <[email protected]>wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon >>>>>>>>>>> <[email protected]>wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I'm having the same problem with HDFS sink. >>>>>>>>>>>> >>>>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as >>>>>>>>>>>> the sink expects. >>>>>>>>>>>> This causes a NPE which ends in returning the message to the >>>>>>>>>>>> channel , over and over again. >>>>>>>>>>>> >>>>>>>>>>>> Is my only option to re-write the HDFS sink? >>>>>>>>>>>> Isn't there any way to intercept in the sink work? >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> You can write a custom interceptor and remove/modify the poison >>>>>>>>>>> message. >>>>>>>>>>> >>>>>>>>>>> Interceptors are called before message makes it way into the >>>>>>>>>>> channel. >>>>>>>>>>> >>>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors >>>>>>>>>>> >>>>>>>>>>> I wrote a blog about it a while back >>>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/ >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Anat >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind >>>>>>>>>>>>> filing a Jira to track this? Sample data to cause this would be >>>>>>>>>>>>> even >>>>>>>>>>>>> better. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Arvind Prabhakar >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> This was using the provided ElasticSearch sink. The logs >>>>>>>>>>>>>> were not helpful. I ran it through with the debugger and found >>>>>>>>>>>>>> the source >>>>>>>>>>>>>> of the problem. >>>>>>>>>>>>>> >>>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to >>>>>>>>>>>>>> determine if the content is JSON; if it contains a "{" anywhere >>>>>>>>>>>>>> in it, it's >>>>>>>>>>>>>> considered JSON. My body contained that but wasn't JSON, >>>>>>>>>>>>>> causing the JSON >>>>>>>>>>>>>> parser to throw a CharConversionException from >>>>>>>>>>>>>> addComplexField(...) (but >>>>>>>>>>>>>> not the expected JSONException). We've changed >>>>>>>>>>>>>> addComplexField(...) to >>>>>>>>>>>>>> catch different types of exceptions and fall back to treating it >>>>>>>>>>>>>> as a >>>>>>>>>>>>>> simple field. We'll probably submit a patch for this soon. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the >>>>>>>>>>>>>> bigger picture there should be some sort of mechanism to >>>>>>>>>>>>>> automatically >>>>>>>>>>>>>> detect and toss / skip / flag problematic events without them >>>>>>>>>>>>>> plugging up >>>>>>>>>>>>>> the flow. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the >>>>>>>>>>>>>>> part where the sink fails to remove an event from the channel? >>>>>>>>>>>>>>> I am >>>>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a >>>>>>>>>>>>>>> custom one. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The reason I ask is because sinks do not introspect the >>>>>>>>>>>>>>> event, and hence there is no reason why it will fail during the >>>>>>>>>>>>>>> event's >>>>>>>>>>>>>>> removal. It is more likely that there is a problem within the >>>>>>>>>>>>>>> channel in >>>>>>>>>>>>>>> that it cannot dereference the event correctly. Looking at the >>>>>>>>>>>>>>> logs will >>>>>>>>>>>>>>> help us identify the root cause for what you are experiencing. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Arvind Prabhakar >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Both reasonable suggestions. What would a custom sink >>>>>>>>>>>>>>>> look like in this case, and how would I only eliminate the >>>>>>>>>>>>>>>> problem events >>>>>>>>>>>>>>>> since I don't know what they are until they are attempted by >>>>>>>>>>>>>>>> the "real" >>>>>>>>>>>>>>>> sink? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking >>>>>>>>>>>>>>>> the approach of exhaustively finding and eliminating possible >>>>>>>>>>>>>>>> failure >>>>>>>>>>>>>>>> cases. It's not possible to eliminate every single failure >>>>>>>>>>>>>>>> case, so >>>>>>>>>>>>>>>> shouldn't there be a method of last resort to eliminate >>>>>>>>>>>>>>>> problem events from >>>>>>>>>>>>>>>> the channel? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Or you could write a custom sink that removes this event >>>>>>>>>>>>>>>>> (more work of course) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Hari >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> if you have a way to identify such events.. you may be >>>>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before >>>>>>>>>>>>>>>>> they get into the >>>>>>>>>>>>>>>>> channel. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi everyone. My Flume adventures continue. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's >>>>>>>>>>>>>>>>> filling because a stubborn message is stuck. The sink won't >>>>>>>>>>>>>>>>> accept it (for >>>>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point >>>>>>>>>>>>>>>>> here). This >>>>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back >>>>>>>>>>>>>>>>> into the channel >>>>>>>>>>>>>>>>> when the sink refuses. Obviously, this isn't ideal. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal >>>>>>>>>>>>>>>>> with these situations. Things that come to mind might be: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. Ditch the event after n attempts. >>>>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area" >>>>>>>>>>>>>>>>> (maybe a different source / sink / channel?) that someone >>>>>>>>>>>>>>>>> can look at >>>>>>>>>>>>>>>>> later. >>>>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to >>>>>>>>>>>>>>>>> manually kill these messages. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- Jeremy >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> thanks >>>>>>>>>>> ashish >>>>>>>>>>> >>>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog >>>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
