[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367542#comment-15367542 ]
ASF GitHub Bot commented on FLINK-4019: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70059282 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect + * @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; * this index should be the returned value from * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called * when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); --- End diff -- Did you also consider passing the record to the `sourceContext.collectWithTimestamp()` method in addition to passing it to the serialization schema? > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > ------------------------------------------------------------------------------------- > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)