Hi Aljoscha,

Thanks for your response.

We wanted to customize the watermark period calculation since we were not
getting the desired results with BoundedOutOfOrdernessGenerator class.

As for the current problem, we have identified the problem of why the
records are not processed as expected. It is related to the watermark
calculation. The formula is to assign the sliding window size (in
milliseconds) as maximum out of orderness parameter  and add it to the
current maximum timestamp while generating new watermark.

i.e.

   @Override
   public Watermark getCurrentWatermark() {

    Watermark watermark = new Watermark(currentMaxTimestamp +
maxOutOfOrderness);

       return watermark;
   }


*Sujit Sakre*

On 2 February 2017 at 21:09, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> what about using BoundedOutOfOrdernessGenerator? Why did that not work
> for your case?
>
> Cheers,
> Aljoscha
>
> On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <sujit.sa...@northgateps.com>
> wrote:
>
>> Hi Robert, Aljoscha,
>>
>> Many thanks for pointing out. Watermark generation is the problem. It is
>> generating timestamps far ahead of current year due to our code which tried
>> to cover all records but inadvertently made a very large watermark.
>>
>> We have tried fixing this with other combinations of generating
>> watermarks, however, we are unable to find the right combination and end up
>> not processing at least one record from our dataset.
>>
>> Is there a formula or algorithm for generating the right watermark?
>> Please could suggest.
>>
>> Thanks again.
>>
>>
>> *Sujit Sakre*
>>
>> On 26 January 2017 at 20:17, Robert Metzger <rmetz...@apache.org> wrote:
>>
>> Hi,
>> I would guess that the watermark generation does not work as expected.
>> I would recommend to log the extracted timestamps + the watermarks to
>> understand how time is progressing, and when watermarks are generated to
>> trigger a window computation.
>>
>> On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <sujit.sa...@northgateps.com
>> > wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks.
>>
>> Yes, we are using Event Time.
>> Yes, Flink program is kept running in the IDE, i.e. eclipse and not
>> closed, after the first batch of events and when adding the second batch.
>> Yes, We do have acustom timestamp/watermark assigner, implemented as
>> BoundedOutOfOrdernessGenerator2
>>
>> Are we using the properties for Kafka correctly?
>> We are using Flink 1.1.1 and Flink Kafka connector:
>> flink-connector-kafka-0.9_2.11
>>
>> More about the behavior:
>> I have noticed that sometimes even after the first writing to the Kafka
>> queue,  and when the Flink program runs, sometimes it does process the
>> queue immediately. We need to restart. This is quite random.
>>
>> Following is the rough outline of our code.
>>
>> public class SlidingWindow2{
>>
>> public static void main(String[] args) throws Exception {
>>
>> // set up the execution environment
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> // configure the Kafka consumer
>> Properties kafkaProps = new Properties();
>> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>> kafkaProps.setProperty("group.id", "demo");
>> // always read the Kafka topic from the start
>> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>>
>>                 FlinkKafkaConsumer09<Tuple5<String, String, Float,
>> Float, String>> consumer = new FlinkKafkaConsumer09<>(
>> "test",            // kafka topic name
>> new dataSchema(),
>> kafkaProps);
>>                 DataStream<Tuple5<String, String, Float, Float, String>>
>> stream1 = env.addSource(consumer);
>>                 DataStream<Tuple5<String, String, Float, Float, String>>
>> keyedStream = stream1.assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessGenerator2());
>>
>>                 keyedStream.keyBy(4)
>> .window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
>> .apply(new CustomSlidingWindowFunction());
>>
>>                 env.execute("Sliding Event Time Window Processing");
>>
>>            }
>> }
>>
>>
>> public static class CustomSlidingWindowFunction implements
>> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
>> String, Float, Float, String>, Tuple, TimeWindow>{
>>
>> @Override
>> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
>> String, Float, Float, String>> input,
>> Collector<Tuple5<String, String, Float, Float, String>> out) throws
>> Exception {
>>
>> ....
>>         }
>>
>>
>> // Implemented custom Periodic Watermark as below from public static
>> class BoundedOutOfOrdernessGenerator2 implements
>> AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float,
>> String>> { /** * */ private static final long serialVersionUID = 1L;
>> private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in
>> seconds private long currentMaxTimestamp; @Override public long
>> extractTimestamp(Tuple5<String, String, Float, Float, String> element,
>> long previousElementTimestamp) { //System.out.println("inside
>> extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat =
>> new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate =
>> dateFormat.parse(element.f0); } catch (ParseException e) {
>> e.printStackTrace(); } long timestamp = parseDate.getTime();
>> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
>> timestamp; } @Override public Watermark getCurrentWatermark() { // return
>> the watermark as twice the current highest timestamp minus the
>> out-of-orderness bound // this is because it is not covering the lateness
>> sufficiently; now it does // in future this may be multiple of 3 or more if
>> necessary to cover the gap in records received return new
>> Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }
>>
>>
>>
>>
>>
>> *Sujit Sakre*
>>
>>
>>
>> On 24 January 2017 at 22:34, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hi,
>> a bit more information would be useful. Are you using event-time? Is the
>> Flink program kept running after adding the first batch of events and when
>> adding the second batch or is it to invocations of your Flink program? Do
>> you have a custom timestamp/watermark assigner?
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <sujit.sa...@northgateps.com>
>> wrote:
>>
>> Hi,
>>
>> We are using a sliding window function to process data read from Kafka
>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
>> function and sink are running correctly.
>>
>> To test the program, we are generating a stream of data from command line.
>> This works when we add set of records once. When we add again, it does
>> not work, Flink produces no result, even though the records are added to
>> same Kafka topic from the same command line instance.
>>
>> Please could you suggest what could be wrong.
>>
>> Many thanks.
>>
>>
>> *Sujit Sakre*
>>
>> This email is sent on behalf of Northgate Public Services (UK) Limited
>> and its associated companies including Rave Technologies (India) Pvt
>> Limited (together "Northgate Public Services") and is strictly confidential
>> and intended solely for the addressee(s).
>> If you are not the intended recipient of this email you must: (i) not
>> disclose, copy or distribute its contents to any other person nor use its
>> contents in any way or you may be acting unlawfully;  (ii) contact
>> Northgate Public Services immediately on +44(0)1908 264500
>> <+44%201908%20264500> quoting the name of the sender and the addressee
>> then delete it from your system.
>> Northgate Public Services has taken reasonable precautions to ensure that
>> no viruses are contained in this email, but does not accept any
>> responsibility once this email has been transmitted.  You should scan
>> attachments (if any) for viruses.
>>
>> Northgate Public Services (UK) Limited, registered in England and Wales
>> under number 00968498 with a registered address of Peoplebuilding 2,
>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>
>>
>>
>> This email is sent on behalf of Northgate Public Services (UK) Limited
>> and its associated companies including Rave Technologies (India) Pvt
>> Limited (together "Northgate Public Services") and is strictly confidential
>> and intended solely for the addressee(s).
>> If you are not the intended recipient of this email you must: (i) not
>> disclose, copy or distribute its contents to any other person nor use its
>> contents in any way or you may be acting unlawfully;  (ii) contact
>> Northgate Public Services immediately on +44(0)1908 264500
>> <+44%201908%20264500> quoting the name of the sender and the addressee
>> then delete it from your system.
>> Northgate Public Services has taken reasonable precautions to ensure that
>> no viruses are contained in this email, but does not accept any
>> responsibility once this email has been transmitted.  You should scan
>> attachments (if any) for viruses.
>>
>> Northgate Public Services (UK) Limited, registered in England and Wales
>> under number 00968498 with a registered address of Peoplebuilding 2,
>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>
>>
>>
>>
>> This email is sent on behalf of Northgate Public Services (UK) Limited
>> and its associated companies including Rave Technologies (India) Pvt
>> Limited (together "Northgate Public Services") and is strictly confidential
>> and intended solely for the addressee(s).
>> If you are not the intended recipient of this email you must: (i) not
>> disclose, copy or distribute its contents to any other person nor use its
>> contents in any way or you may be acting unlawfully;  (ii) contact
>> Northgate Public Services immediately on +44(0)1908 264500
>> <+44%201908%20264500> quoting the name of the sender and the addressee
>> then delete it from your system.
>> Northgate Public Services has taken reasonable precautions to ensure that
>> no viruses are contained in this email, but does not accept any
>> responsibility once this email has been transmitted.  You should scan
>> attachments (if any) for viruses.
>>
>> Northgate Public Services (UK) Limited, registered in England and Wales
>> under number 00968498 with a registered address of Peoplebuilding 2,
>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>
>

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.

Reply via email to