Yes, an AssignerWithPeriodicWatermarks is peridocially called. The interval is configured with StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval(). When you configure event-time mode (setStreamTimeCharacteristic(TimeCharacteristic.EventTime), the default is set to 200ms.
Best, Fabian 2017-03-17 0:14 GMT+01:00 Paul Smith <psm...@aconex.com>: > Due to the slight out of sequence of the log timestamps, I tried switching > to a “BoundedOutOfOrdernessTimestampExtractor” and used a minute as the > threshold, but I still couldn’t get the watermarks to fire. Setting break > points and trying to follow the code, I Can’t see hwere the > getCurrentWaterMark() is being called..? > > Is that done via a periodic timer? The autoWaterMarkInterval? (which is > using the default) is that responsible as a periodic “poller” of what the > water marks are and used to trigger things? > > > > > Paul > > > > > > > > > > *From: *Fabian Hueske <fhue...@gmail.com> > *Reply-To: *"user@flink.apache.org" <user@flink.apache.org> > *Date: *Friday, 17 March 2017 at 9:11 am > > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Batch stream Sink delay ? > > > > Actually, I think the program you shared in the first mail of this thread > looks fine for the purpose you describe. > > Timestamp and watermark assignment works as follows: > > - For each records, a long timestamp is extracted (UNIX/epoch timestamp). > > - A watermark is a timestamp which says that no more records with a > timestamp lower than the watermark will be processed. Records which violate > that condition are by default dropped. > > - So the watermark should follow the max emitted record timestamp minus a > safety margin. > > - If you use a periodic watermark assigner, the watermark function is > periodically called. > > Best, Fabian > > > > > > 2017-03-16 22:54 GMT+01:00 Paul Smith <psm...@aconex.com>: > > I have managed to discover that my presumption of log4j log file being a _ > *guaranteed*_ sequential order of time is incorrect (race conditions). > So some logs are out of sequence, and I was getting _*some*_ Monotonic > Timestamp violations. I did not discover this because somehow my local > flink was not outputting logs properly. (a restart of the service seemed to > fix that..) > > > > **HOWEVER**.. Once I switched to an “allow lateness” style timestamp > assigner, I still get the same problem. > > > Fundamentally I think I’ve come to the realisation that my concept of > Event Time is not the same as Flink. So let me start again to explain what > I’d like to do. > > > I’d like to group-by log records and sum up the metric values according to > their timestamp – effectively a SQL group by where the timestamp of the > event is aggegrated into 15 minute chunks. I’d like to be able to do > (eventually) run this same job for Batch & Live streaming. > > > > I had thought be assigning the Log rows timestamp to BE the Event Time, > that the Time Window aggregation would work. But now I wonder if what I > should be doing is assigning a new field, called ‘time bracket’ perhaps, > and add that as one of the Keys for the stream (I was keying by > “identifierType:identifier”, so change that to > “identifierType:identifier:timeBracket”. > This way the Fold function can still sum up the metrics by this groupBy > key, and then rely on some periodic _*actual time*_ watermarking > (ingestion or Processing). > > > > I think I should disconnect the Flink time system from this aggregation > level. I had thought that because I wanted to be able to apply this job to > both a Live Streaming and a Batch processing (historical logs) that I > needed to use the Event Time model.. > > > > That could be my fatal flaw. Does that make sense? > > > > Paul > > > > *From: *Fabian Hueske <fhue...@gmail.com> > *Reply-To: *"user@flink.apache.org" <user@flink.apache.org> > *Date: *Thursday, 16 March 2017 at 10:22 pm > > > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Batch stream Sink delay ? > > > > What kind of timestamp and watermark extractor are you using? > > Can you share your implementation? > > You can have a look at the example programs (for example [1]). These can > be started and debugged inside an IDE by executing the main method. > If you run it in an external process, you should be able to connect to the > process with the standard options. > > Best, Fabian > > > [1] https://github.com/apache/flink/blob/master/flink- > examples/flink-examples-streaming/src/main/java/org/ > apache/flink/streaming/examples/wordcount/WordCount.java > > > > 2017-03-16 12:10 GMT+01:00 Paul Smith <psm...@aconex.com>: > > Thanks again for your reply. > > > > I've tried with both Parallel=1 through to 3. Same behavior. > > > > The log file is monotonically increasing time stamps generated through an > application using log4j. Each log line is distinctly incrementing time > stamps it is an 8GB file I'm using as a test case and has about 8 million > rows. > > > > Whether parallel of 1 or 3 the same output is shown, the data gets to the > sink at the end and all looks correct - I set the folded results record > time stamp to the end of each window and I see nice chunks of 15 minute > blocks in the result. > > > > I'm not sure why the watermarks are not being sent as the data progresses. > > > > I might try pushing the data (same data) though Kafka and see if I get a > different result. I might also take a sample through the file (rather than > the whole file to see if I get differing results) > > > > Is there a wiki page anywhere that shows how to debug a job thorough an > IDE? Can I easily remote attach to a running process via standard java > options? > > > > Regards > > > > Paul > > > On 16 Mar 2017, at 21:15, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Paul, > > since each operator uses the minimum watermark of all its inputs, you must > ensure that each parallel task is producing data. > > If a source does not produce data, it will not increase the timestamps of > its watermarks. > > Another challenge, that you might run into is that you need to make sure > that the file (or file chunks if it is split for parallel reading) is read > in the increasing timestamp order. > > Otherwise, watermarks will be emitted too early. I > > If I got understood your use case correctly, you are just experimenting > with file input to get a feeling for the API. > > I would try to set the parallelism of the file source to 1 to ensure that > the data is read in the same order and that all tasks are producing data. > > Hope this helps, > > Fabian > > > > 2017-03-15 23:54 GMT+01:00 Paul Smith <psm...@aconex.com>: > > Thanks Fabian, I’m pretty sure you are correct here. I can see in the > Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks > of it, so Watermarks are not being emitted at all until the end. This > stays all the way through the job. > > > > I’m not sure why this is the case. I’ve verified that my > TimestampExtractor class is being called, and returning the value I’d > expect (The timestamp from the log line), and looks legitimate. My > WindowFunction which is doing the aggregation is not being called until > right at the end of the job, but yet the windows comes out ok in the > destination. > > > > I am not sure how to debug this one. Do you or anyone have any other > suggestions on how to debug why the Windowing is not being triggered? I > can’t glean any useful further ideas from the metrics I can see.. > > > > Appreciate the help > > > > Cheers, > > > Paul > > > > *From: *Fabian Hueske <fhue...@gmail.com> > *Reply-To: *"user@flink.apache.org" <user@flink.apache.org> > *Date: *Tuesday, 14 March 2017 at 7:59 pm > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Batch stream Sink delay ? > > > > Hi Paul, > > This might be an issue with the watermarks. A window operation can only be > compute and emit its results when the watermark time is later than the end > time of the window. > > Each operator keeps track of the maximum timestamp of all its input tasks > and computes its own time as the minimum of all those maximum timestamps. > > If one (or all) watermarks received by the window operator is not later > than the end time of the window, the window will not be computed. > > When an file input is completely processed, Flink sends a Long.MAX_VALUE > timestamp which might trigger the execution at the end of the job. > > I would try to debug the watermarks of your job. The web dashboard > provides a few metrics for that. > > Best, Fabian > > > > 2017-03-14 2:47 GMT+01:00 Paul Smith <psm...@aconex.com>: > > Using Flink 1.2. > > > > Hi all, I have a question about Batch processing and Sinks. I have a > Flink job that parses a User Request log file that contains performance > data per request. It accumulates metric data values into 15 minute time > windows. Each log line is mapped to multiple records, so that each of the > metric value can be 'billed' against a few different categories (User, > Organisation, Project, Action). The goal of the Flink job is to distil the > volume of request data into more manageable summaries. We can then see > breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) > and allow us to look for trends/outliers. > > > > It is working very well as a batch, but with one unexpected behaviour. > > > > What I can't understand is that the Sink does not appear to get _any_ > records until the rest of the chain has completed over the entire file. > I've played around with parallelism (1->3), and also verified with logging > that the Sink isn't seeing any data until the entire previous chain is > complete. Is this expected behaviour? I was thinking that as each Time > Window passed a 'block' of the results would be emitted to the sink. Since > we use Event Time characteristics, the batch job ought to emit these chunks > as each 15 minute segment passes? > > > > You can see the base job here, with an example of a single log line with > the metrics here: > > > > https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152 > > > > Each Category I've called an 'identifierType' (User, Organisation..), with > the 'identifier' the value of that (a UserId for example). I key the > stream by this pair of records and then Fold the records by the Time window > summing each metric type's value up. I have yet to work out the proper use > case for Fold versus Reduce, I may have got that wrong, but I can't see how > that changes the flow here. > > > > The output is a beautiful rolled up summary by 15 minutes by each > identifierType & identifier. I have yet to attempt a live Streaming > version of this, but had thought the Batch version would also be concurrent > and start emitting 15 minute windows as soon as the stream chunks > transitions into the next window. Given the entire job takes about 5 > minutes on my laptop for 8 million raw source lines whose data is spread > out over an entire day, I would have thought there's some part of that 5 > minutes that would be emitting chunks of summary data to the sink? But > nothing turns up until the entire job is done. > > > > Maybe the data is just too small.. Maybe there's buffering going on > somewhere in the chain. ? > > > > Any pointers would be appreciated in understanding the flow here. > > > > Cheers, > > > > Paul Smith > > > > > > > > > > > > >