Ok, thanks for letting us know!

On Thu, 2 Feb 2017 at 17:40 Sujit Sakre <sujit.sa...@northgateps.com> wrote:

> Implementing this formula seems to have solved our problem now. Thanks.
>
> On 2 February 2017 at 21:21, Sujit Sakre <sujit.sa...@northgateps.com>
> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.
>
> We wanted to customize the watermark period calculation since we were not
> getting the desired results with BoundedOutOfOrdernessGenerator class.
>
> As for the current problem, we have identified the problem of why the
> records are not processed as expected. It is related to the watermark
> calculation. The formula is to assign the sliding window size (in
> milliseconds) as maximum out of orderness parameter  and add it to the
> current maximum timestamp while generating new watermark.
>
> i.e.
>
>    @Override
>    public Watermark getCurrentWatermark() {
>
>     Watermark watermark = new Watermark(currentMaxTimestamp +
> maxOutOfOrderness);
>
>        return watermark;
>    }
>
>
> *Sujit Sakre*
>
> On 2 February 2017 at 21:09, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi,
> what about using BoundedOutOfOrdernessGenerator? Why did that not work for
> your case?
>
> Cheers,
> Aljoscha
>
> On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <sujit.sa...@northgateps.com>
> wrote:
>
> Hi Robert, Aljoscha,
>
> Many thanks for pointing out. Watermark generation is the problem. It is
> generating timestamps far ahead of current year due to our code which tried
> to cover all records but inadvertently made a very large watermark.
>
> We have tried fixing this with other combinations of generating
> watermarks, however, we are unable to find the right combination and end up
> not processing at least one record from our dataset.
>
> Is there a formula or algorithm for generating the right watermark? Please
> could suggest.
>
> Thanks again.
>
>
> *Sujit Sakre*
>
> On 26 January 2017 at 20:17, Robert Metzger <rmetz...@apache.org> wrote:
>
> Hi,
> I would guess that the watermark generation does not work as expected.
> I would recommend to log the extracted timestamps + the watermarks to
> understand how time is progressing, and when watermarks are generated to
> trigger a window computation.
>
> On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <sujit.sa...@northgateps.com>
> wrote:
>
> Hi Aljoscha,
>
> Thanks.
>
> Yes, we are using Event Time.
> Yes, Flink program is kept running in the IDE, i.e. eclipse and not
> closed, after the first batch of events and when adding the second batch.
> Yes, We do have acustom timestamp/watermark assigner, implemented as 
> BoundedOutOfOrdernessGenerator2
>
>
> Are we using the properties for Kafka correctly?
> We are using Flink 1.1.1 and Flink Kafka connector:
> flink-connector-kafka-0.9_2.11
>
> More about the behavior:
> I have noticed that sometimes even after the first writing to the Kafka
> queue,  and when the Flink program runs, sometimes it does process the
> queue immediately. We need to restart. This is quite random.
>
> Following is the rough outline of our code.
>
> public class SlidingWindow2{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> kafkaProps.setProperty("group.id", "demo");
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
>                 FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
> String>> consumer = new FlinkKafkaConsumer09<>(
> "test",            // kafka topic name
> new dataSchema(),
> kafkaProps);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> stream1 = env.addSource(consumer);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
>                 keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
> .apply(new CustomSlidingWindowFunction());
>
>                 env.execute("Sliding Event Time Window Processing");
>
>            }
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
> String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input,
> Collector<Tuple5<String, String, Float, Float, String>> out) throws
> Exception {
>
> ....
>         }
>
>
> // Implemented custom Periodic Watermark as below from public static class
> BoundedOutOfOrdernessGenerator2 implements
> AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float,
> String>> { /** * */ private static final long serialVersionUID = 1L;
> private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in
> seconds private long currentMaxTimestamp; @Override public long
> extractTimestamp(Tuple5<String, String, Float, Float, String> element, long
> previousElementTimestamp) { //System.out.println("inside
> extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat =
> new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate =
> dateFormat.parse(element.f0); } catch (ParseException e) {
> e.printStackTrace(); } long timestamp = parseDate.getTime();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
> timestamp; } @Override public Watermark getCurrentWatermark() { // return
> the watermark as twice the current highest timestamp minus the
> out-of-orderness bound // this is because it is not covering the lateness
> sufficiently; now it does // in future this may be multiple of 3 or more if
> necessary to cover the gap in records received return new
> Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }
>
>
>
>
>
> *Sujit Sakre*
>
>
>
> On 24 January 2017 at 22:34, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi,
> a bit more information would be useful. Are you using event-time? Is the
> Flink program kept running after adding the first batch of events and when
> adding the second batch or is it to invocations of your Flink program? Do
> you have a custom timestamp/watermark assigner?
>
> Cheers,
> Aljoscha
>
> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <sujit.sa...@northgateps.com>
> wrote:
>
> Hi,
>
> We are using a sliding window function to process data read from Kafka
> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
> function and sink are running correctly.
>
> To test the program, we are generating a stream of data from command line.
> This works when we add set of records once. When we add again, it does not
> work, Flink produces no result, even though the records are added to same
> Kafka topic from the same command line instance.
>
> Please could you suggest what could be wrong.
>
> Many thanks.
>
>
> *Sujit Sakre*
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>

Reply via email to