Hi, what about using BoundedOutOfOrdernessGenerator? Why did that not work for your case?
Cheers, Aljoscha On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <sujit.sa...@northgateps.com> wrote: > Hi Robert, Aljoscha, > > Many thanks for pointing out. Watermark generation is the problem. It is > generating timestamps far ahead of current year due to our code which tried > to cover all records but inadvertently made a very large watermark. > > We have tried fixing this with other combinations of generating > watermarks, however, we are unable to find the right combination and end up > not processing at least one record from our dataset. > > Is there a formula or algorithm for generating the right watermark? Please > could suggest. > > Thanks again. > > > *Sujit Sakre* > > On 26 January 2017 at 20:17, Robert Metzger <rmetz...@apache.org> wrote: > > Hi, > I would guess that the watermark generation does not work as expected. > I would recommend to log the extracted timestamps + the watermarks to > understand how time is progressing, and when watermarks are generated to > trigger a window computation. > > On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <sujit.sa...@northgateps.com> > wrote: > > Hi Aljoscha, > > Thanks. > > Yes, we are using Event Time. > Yes, Flink program is kept running in the IDE, i.e. eclipse and not > closed, after the first batch of events and when adding the second batch. > Yes, We do have acustom timestamp/watermark assigner, implemented as > BoundedOutOfOrdernessGenerator2 > > > Are we using the properties for Kafka correctly? > We are using Flink 1.1.1 and Flink Kafka connector: > flink-connector-kafka-0.9_2.11 > > More about the behavior: > I have noticed that sometimes even after the first writing to the Kafka > queue, and when the Flink program runs, sometimes it does process the > queue immediately. We need to restart. This is quite random. > > Following is the rough outline of our code. > > public class SlidingWindow2{ > > public static void main(String[] args) throws Exception { > > // set up the execution environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > // configure the Kafka consumer > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "demo"); > // always read the Kafka topic from the start > kafkaProps.setProperty("auto.offset.reset" ,"earliest"); > > FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, > String>> consumer = new FlinkKafkaConsumer09<>( > "test", // kafka topic name > new dataSchema(), > kafkaProps); > DataStream<Tuple5<String, String, Float, Float, String>> > stream1 = env.addSource(consumer); > DataStream<Tuple5<String, String, Float, Float, String>> > keyedStream = stream1.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessGenerator2()); > > keyedStream.keyBy(4) > .window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) > .apply(new CustomSlidingWindowFunction()); > > env.execute("Sliding Event Time Window Processing"); > > } > } > > > public static class CustomSlidingWindowFunction implements > WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, > String, Float, Float, String>, Tuple, TimeWindow>{ > > @Override > public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, > String, Float, Float, String>> input, > Collector<Tuple5<String, String, Float, Float, String>> out) throws > Exception { > > .... > } > > > // Implemented custom Periodic Watermark as below from public static class > BoundedOutOfOrdernessGenerator2 implements > AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, > String>> { /** * */ private static final long serialVersionUID = 1L; > private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in > seconds private long currentMaxTimestamp; @Override public long > extractTimestamp(Tuple5<String, String, Float, Float, String> element, long > previousElementTimestamp) { //System.out.println("inside > extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = > new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = > dateFormat.parse(element.f0); } catch (ParseException e) { > e.printStackTrace(); } long timestamp = parseDate.getTime(); > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return > timestamp; } @Override public Watermark getCurrentWatermark() { // return > the watermark as twice the current highest timestamp minus the > out-of-orderness bound // this is because it is not covering the lateness > sufficiently; now it does // in future this may be multiple of 3 or more if > necessary to cover the gap in records received return new > Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } } > > > > > > *Sujit Sakre* > > > > On 24 January 2017 at 22:34, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > a bit more information would be useful. Are you using event-time? Is the > Flink program kept running after adding the first batch of events and when > adding the second batch or is it to invocations of your Flink program? Do > you have a custom timestamp/watermark assigner? > > Cheers, > Aljoscha > > On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <sujit.sa...@northgateps.com> > wrote: > > Hi, > > We are using a sliding window function to process data read from Kafka > Stream. We are using FlinkKafkaConsumer09 to read the data. The window > function and sink are running correctly. > > To test the program, we are generating a stream of data from command line. > This works when we add set of records once. When we add again, it does not > work, Flink produces no result, even though the records are added to same > Kafka topic from the same command line instance. > > Please could you suggest what could be wrong. > > Many thanks. > > > *Sujit Sakre* > > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 > <+44%201908%20264500> quoting the name of the sender and the addressee > then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. > > > > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 > <+44%201908%20264500> quoting the name of the sender and the addressee > then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. > > > > > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 > <+44%201908%20264500> quoting the name of the sender and the addressee > then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. >