[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367542#comment-15367542
 ] 

ASF GitHub Bot commented on FLINK-4019:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70059282
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
    @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
         * This method is called by {@link ShardConsumer}s.
         *
         * @param record the record to collect
    +    * @param recordTimestamp timestamp to attach to the collected record
         * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
         *                        this index should be the returned value from
         *                        {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
         *                        when the shard state was registered.
         * @param lastSequenceNumber the last sequence number value to update
         */
    -   protected void emitRecordAndUpdateState(T record, int shardStateIndex, 
SequenceNumber lastSequenceNumber) {
    +   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
                synchronized (checkpointLock) {
                        sourceContext.collect(record);
    --- End diff --
    
    Did you also consider passing the record to the 
`sourceContext.collectWithTimestamp()` method in addition to passing it to the 
serialization schema?


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-4019
>                 URL: https://issues.apache.org/jira/browse/FLINK-4019
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to