Hi Dario, I don't care about event time I just want to do tumbling window over the "processing time" I.e: count whatever I have in the last 5 minutes.
On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <dario.heini...@gmail.com> wrote: > Hi John > > This is because you are using event time (TumblingEventTimeWinodws) but > you do not have a event time watermark strategy. > It is also why I opened: https://issues.apache.org/jira/browse/FLINK-24623 > because I feel like Flink should be throwing an exception in that case > on startup. > > Take a look at the documentation at: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/ > which should have everything. > > > In order to work with event time, Flink needs to know the events > timestamps, meaning each element in the stream needs to have its event > timestamp assigned. This is usually done by accessing/extracting the > timestamp from > some field in the element by using a TimestampAssigner. > > Timestamp assignment goes hand-in-hand with generating watermarks, which > tell the system about progress in event time. You can configure this by > specifying a WatermarkGenerator. > > Best regards, > > Dario > On 31.01.22 22:28, John Smith wrote: > > Hi I have the following job... I'm expecting the System.out > .println(key.toString()); to at least print, but nothing prints. > > - .flatMap: Fires prints my debug message once as expected. > - .keyBy: Also fires, but prints my debug message twice. > - .apply: Doesn't seem to fire. The debug statement doesn't seem to print. > I'm expecting it to print the key from above keyBy. > > DataStream<MyEvent> slStream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "Kafka Source") > .uid(kafkaTopic).name(kafkaTopic) > .setParallelism(1) > .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works > .uid("map-json-logs").name("map-json-logs"); > slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints > twice > .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins))) > .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, > String, String>, TimeWindow>() { > @Override public void apply(Tuple3<String, String, > String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> > out) throws Exception { > // This should print. > System.out.println(key.toString()); // Do nothing for now > } > }) > .uid("process").name("process") > ; > > > >