Re: Record timestamp from kafka

2018-04-10 Thread Ben Yan
> On Apr 10, 2018, at 7:32 PM, Ben Yan wrote: > > Hi Chesnay: > > I think it would be better without such a limitation.I want to > consult another problem. When I use BucketingSink(I use aws s3), the filename > of a few files after checkpoint still hasn't changed, resulting in the >

Re: Record timestamp from kafka

2018-04-10 Thread Ben Yan
Hi Fabian: I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of fi

Re: Record timestamp from kafka

2018-04-10 Thread Chesnay Schepler
You must use a ProcessFunction for this, the timestamps are not exposed in any way to map/flatmap functions. On 10.04.2018 12:29, Ben Yan wrote: Hi Fabian. If I use ProcessFunction , I can get it! But I want to know that how to get Kafka timestamp in like flatmap and map methods of datastrea

Re: Record timestamp from kafka

2018-04-10 Thread Ben Yan
Hi Fabian. If I use ProcessFunction , I can get it! But I want to know that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language. Thanks! Best Ben > On Apr 4, 2018, at 7:00 PM, Fabian Hueske wrote: > > Hi Navneeth, > > Flink's Ka

Re: Record timestamp from kafka

2018-04-04 Thread Fabian Hueske
Hi Navneeth, Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1]. Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the time

Re: Record timestamp from kafka

2018-03-29 Thread Ben Yan
hi, Is that what you mean? See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145