Thanks again for your reply.

I've tried with both Parallel=1 through to 3. Same behavior.

The log file is monotonically increasing time stamps generated through an 
application using log4j. Each log line is distinctly incrementing time stamps 
it is an 8GB file I'm using as a test case and has about 8 million rows.

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink 
at the end and all looks correct - I set the folded results record time stamp 
to the end of each window and I see nice chunks of 15 minute blocks in the 
result.

I'm not sure why the watermarks are not being sent as the data progresses.

I might try pushing the data (same data) though Kafka and see if I get a 
different result. I might also take a sample through the file (rather than the 
whole file to see if I get differing results)

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  
Can I easily remote attach to a running process via standard java options?

Regards

Paul

On 16 Mar 2017, at 21:15, Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must 
ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of its 
watermarks.

Another challenge, that you might run into is that you need to make sure that 
the file (or file chunks if it is split for parallel reading) is read in the 
increasing timestamp order.
Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with 
file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that the 
data is read in the same order and that all tasks are producing data.

Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith 
<psm...@aconex.com<mailto:psm...@aconex.com>>:
Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric 
view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so 
Watermarks are not being emitted at all until the end.  This stays all the way 
through the job.

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor 
class is being called, and returning the value I’d expect (The timestamp from 
the log line), and looks legitimate.  My WindowFunction which is doing the 
aggregation is not being called until right at the end of the job, but yet the 
windows comes out ok in the destination.

I am not sure how to debug this one.  Do you or anyone have any other 
suggestions on how to debug why the Windowing is not being triggered?  I can’t 
glean any useful further ideas from the metrics I can see..

Appreciate the help

Cheers,

Paul

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Batch stream Sink delay ?

Hi Paul,
This might be an issue with the watermarks. A window operation can only be 
compute and emit its results when the watermark time is later than the end time 
of the window.
Each operator keeps track of the maximum timestamp of all its input tasks and 
computes its own time as the minimum of all those maximum timestamps.
If one (or all) watermarks received by the window operator is not later than 
the end time of the window, the window will not be computed.
When an file input is completely processed, Flink sends a Long.MAX_VALUE 
timestamp which might trigger the execution at the end of the job.
I would try to debug the watermarks of your job. The web dashboard provides a 
few metrics for that.
Best, Fabian

2017-03-14 2:47 GMT+01:00 Paul Smith 
<psm...@aconex.com<mailto:psm...@aconex.com>>:
Using Flink 1.2.

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job 
that parses a User Request log file that contains performance data per request. 
It accumulates metric data values into 15 minute time windows.  Each log line 
is mapped to multiple records, so that each of the metric value can be 'billed' 
against a few different categories (User, Organisation, Project, Action).  The 
goal of the Flink job is to distil the volume of request data into more 
manageable summaries.  We can then see breakdowns of, say, User CPU utilised by 
distinct categories (e.g. Users) and allow us to look for trends/outliers.

It is working very well as a batch, but with one unexpected behaviour.

What I can't understand is that the Sink does not appear to get _any_ records 
until the rest of the chain has completed over the entire file.  I've played 
around with parallelism (1->3), and also verified with logging that the Sink 
isn't seeing any data until the entire previous chain is complete.  Is this 
expected behaviour? I was thinking that as each Time Window passed a 'block' of 
the results would be emitted to the sink.  Since we use Event Time 
characteristics, the batch job ought to emit these chunks as each 15 minute 
segment passes?

You can see the base job here, with an example of a single log line with the 
metrics here:

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

Each Category I've called an 'identifierType' (User, Organisation..), with the 
'identifier' the value of that (a UserId for example).  I key the stream by 
this pair of records and then Fold the records by the Time window summing each 
metric type's value up.  I have yet to work out the proper use case for Fold 
versus Reduce, I may have got that wrong, but I can't see how that changes the 
flow here.

The output is a beautiful rolled up summary by 15 minutes by each 
identifierType & identifier.  I have yet to attempt a live Streaming version of 
this, but had thought the Batch version would also be concurrent and start 
emitting 15 minute windows as soon as the stream chunks transitions into the 
next window.   Given the entire job takes about 5 minutes on my laptop for 8 
million raw source lines whose data is spread out over an entire day, I would 
have thought there's some part of that 5 minutes that would be emitting chunks 
of summary data to the sink?  But nothing turns up until the entire job is done.

Maybe the data is just too small.. Maybe there's buffering going on somewhere 
in the chain. ?

Any pointers would be appreciated in understanding the flow here.

Cheers,

Paul Smith




Reply via email to