Hi Fabian: I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent of S3.Are there any better solutions available?thanks
Best Ben https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22 <https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22> > On Apr 10, 2018, at 6:29 PM, Ben Yan <yan.xiao.bin.m...@gmail.com> wrote: > > Hi Fabian. > > If I use ProcessFunction , I can get it! But I want to know that how > to get Kafka timestamp in like flatmap and map methods of datastream using > scala programming language. > Thanks! > > Best > Ben > >> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> >> Hi Navneeth, >> >> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if >> you configure EventTime for an application [1]. >> Since Flink treats record timestamps as meta data, they are not directly >> accessible by most functions. You can implement a ProcessFunction [2] to >> access the timestamp of a record via the ProcessFunction's Context object. >> >> Best, Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction> >> >> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.m...@gmail.com >> <mailto:yan.xiao.bin.m...@gmail.com>>: >> hi, >> Is that what you mean? >> See : >> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 >> >> <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145> >> >> >> Best >> Ben >> >>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <reachnavnee...@gmail.com >>> <mailto:reachnavnee...@gmail.com>> wrote: >>> >>> Hi, >>> >>> Is there way to get the kafka timestamp in deserialization schema? All >>> records are written to kafka with timestamp and I would like to set that >>> timestamp to every record that is ingested. Thanks. >> >> >