Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70059358 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect + * @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; * this index should be the returned value from * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called * when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); --- End diff -- Ah, sorry, this is actually wrong. Should be using `sourceContext.collectWithTimestamp()`, missed this.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---