[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15368993#comment-15368993 ]
ASF GitHub Bot commented on FLINK-4019: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70164121 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. +#### Event Time for Consumed Records + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} +</div> +</div> + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( + "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Ah, sorry, its not about the method name. I meant `kinesis = kinesis.assignTimestampsAndWatermarks()`. The problem is that the `assignTimestampsAndWatermarks()` returns a stream with assigned timestamps. So doing ```java kinesis.assignTimestampsAndWatermarks(); kinesis.timeWindow(); // <-- this time window won't get the watermarks ``` won't work. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > ------------------------------------------------------------------------------------- > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)