Thank you. This though is a little different. The producer of the kafka message attaches a time stamp https://issues.apache.org/jira/browse/KAFKA-2511. I do not see how I can get to that timestamp through a any stream abstraction over FlinkKafkaConsumer API even though it is available here https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html being used here https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
All I want to do is this * Pull from kafka topic . This topic is been written too with a time stamp on each kafka record. * Write to hdfs using StreamingSink BUT make buckets that * honor ingestion time's water mark. * Questions is, *If we have TimeCharacteristic as IngestionTime, does the context's watermark in getBucketId(KafkaRecord element, Context context) in BucketAssigner.html <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> reflect the kafka record time stamp in https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html <https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html> given this "*automatic timestamp assignment and automatic watermark generation." is done if *TimeCharacteristic is **IngestionTime* (* here <https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html>)* Regards. On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <qcx978132...@gmail.com> wrote: > Hi Vishal > May this doc[1] be helpful for you. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > Best, > Congxian > > > Vishal Santoshi <vishal.santo...@gmail.com> 于2019年1月30日周三 上午4:36写道: > >> It seems from >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html >> that iTimeCharacteristic.IngestionTime should do the trick. >> >> Just wanted to confirm that the ingestion time is the event time provided >> by the kafka producer. >> >> On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> In case where one needs t to use kafka event time ( ingestion time ) >>> for watermark generation and timestamp extraction is setting >>> EventTimeCharactersitic as EventTime enough ? >>> >>> Or is this explicit code required ? >>> >>> consumer.assignTimestampsAndWatermarks(new >>> AssignerWithPunctuatedWatermarks<KafkaRecord>() { >>> @Nullable >>> @Override >>> public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long >>> extractedTimestamp) { >>> return new Watermark(extractedTimestamp); >>> } >>> >>> @Override >>> public long extractTimestamp(KafkaRecord element, long >>> previousElementTimestamp) { >>> return previousElementTimestamp; >>> } >>> }); >>> >>> >>> >>> >>> >>>