Yes, your assumption is right. My TrafficWindow is emitting multiple records and I am looking for a way to iterate over these values and emit another set of multiple records(which would be the computed values from the previous stream).
Thanks a lot for your input Mr. Hueske :) On Fri, Jan 27, 2017 at 9:55 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > the window operation is completed after you called apply the first time. > The result is a regular DataStream. > > I assume your TrafficWindow emits multiple records. Otherwise, you'd > probably apply a simple MapFunction after the window. > So you are looking for a way to iterate over all values returned by a > single TrafficWindow call. > > I think the easiest way would be to emit a single record from > TrafficWindow that contains all original records and to unnest and evaluate > the records in a following FlatMapFucntion. > If that does not work, you would need to define another window. > > Best, Fabian > > > 2017-01-27 21:27 GMT+01:00 Abdul Salam Shaikh <abd.salam.sha...@gmail.com> > : > >> Hi everyone, >> >> I have a window definition like this at the moment in snapshot version >> 1.2.0: >> >> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get >> ExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.setParallelism(1); >> DataStream<String> live = env.addSource(new JsonTestSource()); >> DataStream<FlatObject> jsonToTuple = live.flatMap(new >> RawEventTransformer()); >> >> KeyedStream<FlatObject, String> keyStream = >> jsonToTuple.keyBy(new KeySelector<FlatObject,String>() { >> public String getKey(FlatObject value) throws Exception { >> return value.getIntersectionName(); >> } >> }); >> >> DataStream<FlatObject> flatCorrectedStream = >> keyStream >> .window(GlobalWindows.create()) >> .trigger(new WindowCustomTrigger()) >> .evictor(new WindowEvictor()) >> .apply(new TrafficWindow()); >> flatCorrectedStream.print(); >> >> My apply function generally corrects the raw event streams. >> >> I want to evaluate the corrected stream and generate a stream of >> EvaluatedWindowObjectsStream. >> >> However, using the same operator twice is not an allowed: >> >> DataStream<FlatObject> flatCorrectedStream = >> keyStream >> .window(GlobalWindows.create()) >> .trigger(new WindowCustomTrigger()) >> .evictor(new WindowEvictor()) >> .apply(new TrafficWindow()); >> >> * .apply(new WindowEvaluater());* >> >> I am looking for options where I can achieve the above case. I am looking >> to keep the logic of correcting the streams and evaluating the streams >> separately. Hence, the above case rises. >> >> Thanks! >> >> >> > -- Thanks & Regards, *Abdul Salam Shaikh*