Hi Fabian:
I think it would be better without such a limitation.I want to consult
another problem. When I use BucketingSink(I use aws s3), the filename of a few
files after checkpoint still hasn't changed, resulting in the underline prefix
of the final generation of a small number of files. After analysis, it is found
that it is due to the eventually consistent of S3.Are there any better
solutions available?thanks
Best
Ben
https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22
<https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22>
> On Apr 10, 2018, at 6:29 PM, Ben Yan <[email protected]> wrote:
>
> Hi Fabian.
>
> If I use ProcessFunction , I can get it! But I want to know that how
> to get Kafka timestamp in like flatmap and map methods of datastream using
> scala programming language.
> Thanks!
>
> Best
> Ben
>
>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Navneeth,
>>
>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if
>> you configure EventTime for an application [1].
>> Since Flink treats record timestamps as meta data, they are not directly
>> accessible by most functions. You can implement a ProcessFunction [2] to
>> access the timestamp of a record via the ProcessFunction's Context object.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>>
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
>>
>> 2018-03-30 7:45 GMT+02:00 Ben Yan <[email protected]
>> <mailto:[email protected]>>:
>> hi,
>> Is that what you mean?
>> See :
>> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
>>
>> <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>
>>
>>
>> Best
>> Ben
>>
>>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi,
>>>
>>> Is there way to get the kafka timestamp in deserialization schema? All
>>> records are written to kafka with timestamp and I would like to set that
>>> timestamp to every record that is ingested. Thanks.
>>
>>
>