Great! Thanks for letting us know. On Wed, 11 Jan 2017 at 12:44 Sujit Sakre <sujit.sa...@northgateps.com> wrote:
> Hi Aljoscha, > > I have realized that the output stream is not defined separately in the > code below, and hence the input values are getting in the sink. After > defining a separate output stream it works. > > We have now confirmed that the windows are processed separately as per the > groupings. > > Thanks. > > > *Sujit Sakre* > > > On 10 January 2017 at 22:10, Sujit Sakre <sujit.sa...@northgateps.com> > wrote: > > Hi Aljoscha, > > Thanks. > > I have used the following code for testing: > > main > > keyedStream.keyBy(4) > .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2))) // > 6 min window with 2 min sliding window > .apply(new CustomSlidingWindowFunction()); > > keyedStream.addSink(new SinkFunction<Tuple5<String, String, Float, Float, > String>>() { > /** > * > */ > private static final long serialVersionUID = 1L; > > public void invoke(Tuple5<String, String, Float, Float, String> value) { > System.out.println(value.f1.toString().trim()+", " + > value.f0 + ", "+value.f2 + ", " + value.f3); > } > }); > > > in WindowFunction apply > > ... > > // Condition for selecting a window > if(d.after(x) && d.before(y)){ > > for (Tuple5<String, String, Float, > Float, String> tr: input){ > // Write the window to Collector > out.collect(new Tuple5<>(tr.f0, tr.f1, tr.f2, tr.f3, tr.f4)); > } > > I am getting all input records instead of those windows selected by > the condition. Is there something I am doing wrong? Does this need to be > done in a different way? > > Please let me know. > > Thanks. > > > > *Sujit Sakre* > > > On 10 January 2017 at 20:24, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > instead of writing to files, could you please simply output a value using > the Collector and then write the result stream of the window operation to a > sink (such as a file sink) to see how many windows are being processed. > Having side effects (especially output) in user functions can lead to > programs with quite unexpected behaviour and I would highly discourage > doing that. > > Cheers, > Aljoscha > > On Tue, 10 Jan 2017 at 13:44 Sujit Sakre <sujit.sa...@northgateps.com> > wrote: > > Hi, > > In the link ( > http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results), > Fabian has mentioned that if Event Time is used, consistent results are > possible. > > However, that's not the case with us. We are getting very random results. > > Please suggest. > > > *Sujit Sakre* > > > On 9 January 2017 at 22:27, Sujit Sakre <sujit.sa...@northgateps.com> > wrote: > > Hi, > > We are using Sliding Event Time Window with Kafka Consumer. The window > size is 6 minutes, and slide is 2 minutes. We have written a window > function to select a particular window out of multiple windows for a keyed > stream, e.g. we select about 16 windows out of multiple windows for the > keyed stream based on a particular condition. > > Upon a normal execution, we get 16 windows for processing inside the > condition (in window function mentioned). These windows we are putting in > different files, named after window start and end times. > > the code is as below: > > Calling code > > > public class RealTimeProcessingSlidingWindow{ > > public static void main(String[] args) throws Exception { > > // set up the execution environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > // configure the Kafka consumer > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); > kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); > kafkaProps.setProperty("group.id", DEMO_GROUP); > // always read the Kafka topic from the start > kafkaProps.setProperty("auto.offset.reset" ,"earliest"); > > FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, > String>> consumer = new FlinkKafkaConsumer09<>( > "test", // kafka topic name > new dataSchema(), > kafkaProps); > DataStream<Tuple5<String, String, Float, Float, String>> > stream1 = env.addSource(consumer); > DataStream<Tuple5<String, String, Float, Float, String>> > keyedStream = stream1.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessGenerator2()); > > keyedStream.keyBy(4) > .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2))) // > 6 min window with 2 min sliding window > .apply(new CustomSlidingWindowFunction()); > > env.execute("Sliding Event Time Window Processing"); > > } > } > > > public static class CustomSlidingWindowFunction implements > WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, > String, Float, Float, String>, Tuple, TimeWindow>{ > > @Override > public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, > String, Float, Float, String>> input, > Collector<Tuple5<String, String, Float, Float, String>> out) throws > Exception { > > HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap= > new HashMap<String,Tuple5<String, String, Float, Float, String>>(); > for (Tuple5<String, String, Float, Float, String> wr: input){ > windowMap.put(wr.f1.toString().trim(), wr); > } > > ... > > SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat); > > if(windowMap.containsKey(tk)){ > Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String, > Float, Float, String>) windowMap.get(tk); > > Date d = sf.parse(t.f0.trim()); > > ... > > // Condition for selecting a window > if(d.after(x) && d.before(y)){ > // Write the window output to separate files named after window Lat and Lon > writeWindowToFile(t, window, input); > } > } > } > } > > // Get the buffered writer > private static synchronized BufferedWriter getWriter(String fileName) > throws IOException{ > return new BufferedWriter(new FileWriter(fileName, true)); > } > // Writes an entire window to file for the records in that window > private static synchronized void writeWindowToFile(Tuple5<String, String, > Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String, > String, Float, Float, String>> input) throws IOException{ > // Create a file to write a window to > String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv"; > BufferedWriter br = getWriter(fileName); > > // Iterate and put the records in file > for (Tuple5<String, String, Float, Float, String> tr: input){ > br.write(tr.f1.toString().trim()+", "+ > convertLongIntoDate(window.getStart())+", > "+convertLongIntoDate(window.getEnd())+", "+ > tr.f0+", "+tr.f2+", "+tr.f3+'\n'); > } > // flush the writer and close it > br.close(); > } > > We have written the code to be threadsafe while creating and writing to > file > > In this code, If we execute the code multiple times on the Kafka Stream > (with certain records) most times we get 16 files with corresponding window > records, which is the correct behavior. > > However sometimes only 4 files get created or 1 file or any number less > than 16 gets created randomly, this is anomalous behavior. > > What could be the cause of such behavior? How do we resolve this? > > Please, could you identify and suggest a solution/s. > > Thanks. > > > *Sujit Sakre* > > > > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 > <+44%201908%20264500> quoting the name of the sender and the addressee > then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. > > > > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 > <+44%201908%20264500> quoting the name of the sender and the addressee > then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. >