ok it's working! Thanks. Just out of curiosity, why is the println of keyBy printing twice?
On Mon, 31 Jan 2022 at 17:22, John Smith <java.dev....@gmail.com> wrote: > Oh ok. I was reading here: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/streaming_analytics/#latency-vs-completeness > and Idid a cut and paste lol > > Ok let you know. > > On Mon, 31 Jan 2022 at 17:18, Dario Heinisch <dario.heini...@gmail.com> > wrote: > >> Then you should be using a process based time window, in your case: >> TumblingProcessingTimeWindows >> >> See >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/ >> for more info >> On 31.01.22 23:13, John Smith wrote: >> >> Hi Dario, I don't care about event time I just want to do tumbling window >> over the "processing time" I.e: count whatever I have in the last 5 minutes. >> >> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <dario.heini...@gmail.com> >> wrote: >> >>> Hi John >>> >>> This is because you are using event time (TumblingEventTimeWinodws) but >>> you do not have a event time watermark strategy. >>> It is also why I opened: >>> https://issues.apache.org/jira/browse/FLINK-24623 because I feel like >>> Flink should be throwing an exception in that case >>> on startup. >>> >>> Take a look at the documentation at: >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/ >>> which should have everything. >>> >>> > In order to work with event time, Flink needs to know the events >>> timestamps, meaning each element in the stream needs to have its event >>> timestamp assigned. This is usually done by accessing/extracting the >>> timestamp from > some field in the element by using a TimestampAssigner. >>> > Timestamp assignment goes hand-in-hand with generating watermarks, >>> which tell the system about progress in event time. You can configure this >>> by specifying a WatermarkGenerator. >>> >>> Best regards, >>> >>> Dario >>> On 31.01.22 22:28, John Smith wrote: >>> >>> Hi I have the following job... I'm expecting the System.out >>> .println(key.toString()); to at least print, but nothing prints. >>> >>> - .flatMap: Fires prints my debug message once as expected. >>> - .keyBy: Also fires, but prints my debug message twice. >>> - .apply: Doesn't seem to fire. The debug statement doesn't seem to >>> print. I'm expecting it to print the key from above keyBy. >>> >>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, >>> WatermarkStrategy.noWatermarks(), "Kafka Source") >>> .uid(kafkaTopic).name(kafkaTopic) >>> .setParallelism(1) >>> .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works >>> .uid("map-json-logs").name("map-json-logs"); >>> slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints >>> twice >>> .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins))) >>> .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, >>> String, String>, TimeWindow>() { >>> @Override public void apply(Tuple3<String, String, >>> String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> >>> out) throws Exception { >>> // This should print. >>> System.out.println(key.toString()); // Do nothing for now >>> } >>> }) >>> .uid("process").name("process") >>> ; >>> >>> >>> >>>