Hi Gordon,

Thank you for the clarification. In that case, it sounds like I should be
using a custom extractor to emit watermarks in addition to the Kafka
timestamps. I see that the doc already has code demonstrating how to emit
the timestamps so I will start there.

Thanks,
Jia

On Tue, May 16, 2017 at 10:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Ah, my apologies, some misunderstanding on my side here.
>
> FlinkKafkaConsumer010 attaches the Kafka timestamp with the records, hence
> a timestamp extractor is not required, BUT you’ll still need a watermark
> generator to produce the watermarks. That should explain why the windows
> aren’t firing.
>
> I agree its a bit confusing for the Kafka010 consumer at the moment
> because timestamp extraction and watermark generating is bound to a single
> interface. I think there are plans to separate that in the future.
>
> Cheers,
> Gordon
>
>
> On 17 May 2017 at 1:34:31 PM, Jia Teoh (jiat...@gmail.com) wrote:
>
> Hi Gordon,
>
> Thanks for confirming my understanding that the extractor should not have
> to be defined for 0.10. However, I'm still experiencing the case where not
> using an extractor results in zero window triggers.
>
> I've verified the timestamps in the Kafka records with the following
> command:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> input --from-beginning --property print.timestamp=true
> Where the 'input' topic happens to consist of Strings representing the
> time the record was created. I get output such as the following:
>
> CreateTime:1494998828813        1494998828813
> CreateTime:1494998828901        1494998828901
> CreateTime:1494998828914        1494998828914
> CreateTime:1494998828915        1494998828914
> CreateTime:1494998828915        1494998828915
> CreateTime:1494998829004        1494998829003
> CreateTime:1494998829016        1494998829016
> CreateTime:1494998829016        1494998829016
>
> where CreateTime is the timestamp generated by Kafka and the second value
> is the record value. In this particular case that happens to be the time
> the record was created in the producer, which resides on the same machine
> as the Kafka broker (hence the identical values).
>
> -Jia
>
> On Tue, May 16, 2017 at 9:30 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Jia!
>>
>> This sounds a bit fishy. The docs mention that there is no need for a
>> timestamp / watermark extractor because with 0.10, the timestamps that come
>> with Kafka records can be used directly to produce watermarks for event
>> time.
>>
>> One quick clarification: did you also check whether the timestamps that
>> come with the Kafka 0.10 records are sound and reasonable?
>>
>> Cheers,
>> Gordon
>>
>>
>> On 17 May 2017 at 4:45:19 AM, Jia Teoh (jiat...@gmail.com) wrote:
>>
>> Hi,
>>
>> I'm trying to use KafkaConsumer010 as a source for a windowing job on
>> event time, as provided by Kafka. According to the kafka connector doc (
>> link
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>),
>> I've set the time characteristic to event time (
>> streamExecutionEnvironment.setStreamTimeCharacteristic(Time
>> Characteristic.EventTime)) and am using KafkaConsumer010 along with
>> Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEven
>> tTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll
>> appears to be equivalent as well).
>>
>> Using these configurations I can verify that data is read from Kafka.
>> However, the event time windows never trigger even when data is loaded for
>> much longer than the window size. Is there an additional configuration I am
>> missing?
>>
>> I have verified that the Kafka messages have timestamps. The docs mention
>> that there is no need for a timestamp extractor, but using one to
>> explicitly assign the current time does result in windows being triggered.
>>
>> Thanks,
>> Jia Teoh
>>
>>
>

Reply via email to