Thank you. This though is a little different.

The producer of the kafka message attaches a time stamp
https://issues.apache.org/jira/browse/KAFKA-2511.  I do not see how I can
get to that timestamp through a any stream abstraction over
FlinkKafkaConsumer  API even though it is available here
https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
being used here
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141

All I want to do is this

* Pull from kafka topic . This topic is been written too with a time stamp
on each kafka record.
* Write to hdfs using StreamingSink BUT make buckets that * honor ingestion
time's  water mark. *

Questions is,

*If  we have TimeCharacteristic as IngestionTime,  does the context's
watermark  in   getBucketId(KafkaRecord element, Context context)
in BucketAssigner.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html>
 reflect the kafka record time stamp in
https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
<https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html>
given this "*automatic timestamp assignment and automatic watermark
generation." is done if  *TimeCharacteristic is **IngestionTime*  (* here
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html>)*


Regards.












On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <qcx978132...@gmail.com> wrote:

> Hi Vishal
>  May this doc[1] be helpful for you.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> Best,
> Congxian
>
>
> Vishal Santoshi <vishal.santo...@gmail.com> 于2019年1月30日周三 上午4:36写道:
>
>> It seems from
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
>> that iTimeCharacteristic.IngestionTime should do the trick.
>>
>> Just wanted to confirm that the ingestion time is the event time provided
>> by the kafka producer.
>>
>> On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>>  In case where one needs t to use kafka event time ( ingestion time )
>>> for watermark generation and timestamp extraction is setting
>>> EventTimeCharactersitic  as EventTime enough ?
>>>
>>> Or is this  explicit code required ?
>>>
>>> consumer.assignTimestampsAndWatermarks(new 
>>> AssignerWithPunctuatedWatermarks<KafkaRecord>() {
>>>     @Nullable
>>>     @Override
>>>     public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long 
>>> extractedTimestamp) {
>>>         return new Watermark(extractedTimestamp);
>>>     }
>>>
>>>     @Override
>>>     public long extractTimestamp(KafkaRecord element, long 
>>> previousElementTimestamp) {
>>>         return previousElementTimestamp;
>>>     }
>>> });
>>>
>>>
>>>
>>>
>>>
>>>

Reply via email to