Hi,
instead of writing to files, could you please simply output a value using
the Collector and then write the result stream of the window operation to a
sink (such as a file sink) to see how many windows are being processed.
Having side effects (especially output) in user functions can lead to
programs with quite unexpected behaviour and I would highly discourage
doing that.

Cheers,
Aljoscha

On Tue, 10 Jan 2017 at 13:44 Sujit Sakre <sujit.sa...@northgateps.com>
wrote:

> Hi,
>
> In the link (
> http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
> Fabian has mentioned that if Event Time is used, consistent results are
> possible.
>
> However, that's not the case with us. We are getting very random results.
>
> Please suggest.
>
>
> *Sujit Sakre*
>
>
> On 9 January 2017 at 22:27, Sujit Sakre <sujit.sa...@northgateps.com>
> wrote:
>
> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
>                 FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
> String>> consumer = new FlinkKafkaConsumer09<>(
> "test",            // kafka topic name
> new dataSchema(),
> kafkaProps);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> stream1 = env.addSource(consumer);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
>                 keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
>                 env.execute("Sliding Event Time Window Processing");
>
>            }
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
> String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input,
> Collector<Tuple5<String, String, Float, Float, String>> out) throws
> Exception {
>
> HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap=
> new HashMap<String,Tuple5<String, String, Float, Float, String>>();
> for (Tuple5<String, String, Float, Float, String> wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
>                         ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>
> if(windowMap.containsKey(tk)){
> Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String,
> Float, Float, String>) windowMap.get(tk);
>
> Date d = sf.parse(t.f0.trim());
>
>                                 ...
>
>                                 // Condition for selecting a window
> if(d.after(x) && d.before(y)){
> // Write the window output to separate files named after window Lat and Lon
> writeWindowToFile(t, window, input);
>                                     }
>                          }
>                 }
>         }
>
> // Get the buffered writer
> private static synchronized BufferedWriter getWriter(String fileName)
> throws IOException{
> return new BufferedWriter(new FileWriter(fileName, true));
> }
> // Writes an entire window to file for the records in that window
> private static synchronized void writeWindowToFile(Tuple5<String, String,
> Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input) throws IOException{
> // Create a file to write a window to
> String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
> BufferedWriter br = getWriter(fileName);
>
> // Iterate and put the records in file
> for (Tuple5<String, String, Float, Float, String> tr: input){
> br.write(tr.f1.toString().trim()+", "+
> convertLongIntoDate(window.getStart())+",
> "+convertLongIntoDate(window.getEnd())+", "+
> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
> }
> // flush the writer and close it
> br.close();
> }
>
> We have written the code to be threadsafe while creating and writing to
> file
>
> In this code, If we execute the code multiple times on the Kafka Stream
> (with certain records) most times we get 16 files with corresponding window
> records, which is the correct behavior.
>
> However sometimes only 4 files get created or 1 file or any number less
> than 16 gets created randomly, this is anomalous behavior.
>
> What could be the cause of such behavior? How do we resolve this?
>
> Please, could you identify and suggest a solution/s.
>
> Thanks.
>
>
> *Sujit Sakre*
>
>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>

Reply via email to