Hi everybody,

I finally reached streaming territory. For a student project I want to
implement CluStream for Flink. I guess this is especially interesting to
try queryable state :)

But I have problems at the first steps. My input data is a csv file of
records. For the start I just want to window this csv. I don't want to
use AllWindows
because it's not parallelizable.

So my first question is: Can I window by processing time, like this:

connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))

I didn't find a way, so I added in the csv an index column and tried
to use a countWindow:

DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new
MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
   new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple,
GlobalWindow>() {
      public void apply (Tuple tuple,
                     GlobalWindow window,
                     Iterable<Tuple2<Long, Vector>> values,
                     Collector<Tuple1<Integer>> out) throws Exception {
         int sum = 0;
         Iterator iterator = values.iterator();
         while (iterator.hasNext () ) {
            Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
            sum += 1;
         }
         out.collect (new Tuple1<Integer>(new Integer(sum)));
      }
}).writeAsCsv("text");

To check whether everything works I just count the elements per window
and write them into a csv file.

Flink generates the files but all are empty. Can you tell me, what I did wrong?

Best regards,

Felix

Reply via email to