ok it's working! Thanks. Just out of curiosity, why is the println of keyBy
printing twice?

On Mon, 31 Jan 2022 at 17:22, John Smith <java.dev....@gmail.com> wrote:

> Oh ok. I was reading here:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/streaming_analytics/#latency-vs-completeness
> and Idid a cut and paste lol
>
> Ok let you know.
>
> On Mon, 31 Jan 2022 at 17:18, Dario Heinisch <dario.heini...@gmail.com>
> wrote:
>
>> Then you should be using a process based time window, in your case:
>> TumblingProcessingTimeWindows
>>
>> See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/
>> for more info
>> On 31.01.22 23:13, John Smith wrote:
>>
>> Hi Dario, I don't care about event time I just want to do tumbling window
>> over the "processing time" I.e: count whatever I have in the last 5 minutes.
>>
>> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <dario.heini...@gmail.com>
>> wrote:
>>
>>> Hi John
>>>
>>> This is because you are using event time (TumblingEventTimeWinodws) but
>>> you do not have a event time watermark strategy.
>>> It is also why I opened:
>>> https://issues.apache.org/jira/browse/FLINK-24623 because I feel like
>>> Flink should be throwing an exception in that case
>>> on startup.
>>>
>>> Take a look at the documentation at:
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
>>> which should have everything.
>>>
>>> > In order to work with event time, Flink needs to know the events
>>> timestamps, meaning each element in the stream needs to have its event
>>> timestamp assigned. This is usually done by accessing/extracting the
>>> timestamp from > some field in the element by using a TimestampAssigner.
>>> > Timestamp assignment goes hand-in-hand with generating watermarks,
>>> which tell the system about progress in event time. You can configure this
>>> by specifying a WatermarkGenerator.
>>>
>>> Best regards,
>>>
>>> Dario
>>> On 31.01.22 22:28, John Smith wrote:
>>>
>>> Hi I have the following job... I'm expecting the System.out
>>> .println(key.toString());   to at least print, but nothing prints.
>>>
>>> - .flatMap: Fires prints my debug message once as expected.
>>> - .keyBy: Also fires, but prints my debug message twice.
>>> - .apply: Doesn't seem to fire. The debug statement doesn't seem to
>>> print. I'm expecting it to print the key from above keyBy.
>>>
>>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>>         .uid(kafkaTopic).name(kafkaTopic)
>>>         .setParallelism(1)
>>>         .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>>>         .uid("map-json-logs").name("map-json-logs");        
>>> slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints 
>>> twice
>>>         .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>>>                 .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, 
>>> String, String>, TimeWindow>() {
>>>             @Override            public void apply(Tuple3<String, String, 
>>> String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> 
>>> out) throws Exception {
>>>                 // This should print.                
>>> System.out.println(key.toString());                // Do nothing for now    
>>>         }
>>>         })
>>>         .uid("process").name("process")
>>>         ;
>>>
>>>
>>>
>>>

Reply via email to