Hi Gordon, Thank you for the clarification. In that case, it sounds like I should be using a custom extractor to emit watermarks in addition to the Kafka timestamps. I see that the doc already has code demonstrating how to emit the timestamps so I will start there.
Thanks, Jia On Tue, May 16, 2017 at 10:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Ah, my apologies, some misunderstanding on my side here. > > FlinkKafkaConsumer010 attaches the Kafka timestamp with the records, hence > a timestamp extractor is not required, BUT you’ll still need a watermark > generator to produce the watermarks. That should explain why the windows > aren’t firing. > > I agree its a bit confusing for the Kafka010 consumer at the moment > because timestamp extraction and watermark generating is bound to a single > interface. I think there are plans to separate that in the future. > > Cheers, > Gordon > > > On 17 May 2017 at 1:34:31 PM, Jia Teoh (jiat...@gmail.com) wrote: > > Hi Gordon, > > Thanks for confirming my understanding that the extractor should not have > to be defined for 0.10. However, I'm still experiencing the case where not > using an extractor results in zero window triggers. > > I've verified the timestamps in the Kafka records with the following > command: > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic > input --from-beginning --property print.timestamp=true > Where the 'input' topic happens to consist of Strings representing the > time the record was created. I get output such as the following: > > CreateTime:1494998828813 1494998828813 > CreateTime:1494998828901 1494998828901 > CreateTime:1494998828914 1494998828914 > CreateTime:1494998828915 1494998828914 > CreateTime:1494998828915 1494998828915 > CreateTime:1494998829004 1494998829003 > CreateTime:1494998829016 1494998829016 > CreateTime:1494998829016 1494998829016 > > where CreateTime is the timestamp generated by Kafka and the second value > is the record value. In this particular case that happens to be the time > the record was created in the producer, which resides on the same machine > as the Kafka broker (hence the identical values). > > -Jia > > On Tue, May 16, 2017 at 9:30 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Jia! >> >> This sounds a bit fishy. The docs mention that there is no need for a >> timestamp / watermark extractor because with 0.10, the timestamps that come >> with Kafka records can be used directly to produce watermarks for event >> time. >> >> One quick clarification: did you also check whether the timestamps that >> come with the Kafka 0.10 records are sound and reasonable? >> >> Cheers, >> Gordon >> >> >> On 17 May 2017 at 4:45:19 AM, Jia Teoh (jiat...@gmail.com) wrote: >> >> Hi, >> >> I'm trying to use KafkaConsumer010 as a source for a windowing job on >> event time, as provided by Kafka. According to the kafka connector doc ( >> link >> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>), >> I've set the time characteristic to event time ( >> streamExecutionEnvironment.setStreamTimeCharacteristic(Time >> Characteristic.EventTime)) and am using KafkaConsumer010 along with >> Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEven >> tTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll >> appears to be equivalent as well). >> >> Using these configurations I can verify that data is read from Kafka. >> However, the event time windows never trigger even when data is loaded for >> much longer than the window size. Is there an additional configuration I am >> missing? >> >> I have verified that the Kafka messages have timestamps. The docs mention >> that there is no need for a timestamp extractor, but using one to >> explicitly assign the current time does result in windows being triggered. >> >> Thanks, >> Jia Teoh >> >> >