antssilva96 commented on code in PR #140:
URL: 
https://github.com/apache/flink-connector-aws/pull/140#discussion_r1670116361


##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -217,6 +217,38 @@ properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIME
     If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined 
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
     (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` 
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without 
given a pattern).
 
+### Configuring starting position for new streams
+
+By default, the Flink Kinesis Consumer handles new streams the same way it 
handles a new shard for an existing stream, and it starts consuming from the 
earliest record (same behaviour as TRIM_HORIZON).
+
+This behaviour is fine if you're consuming from a stream that you don't want 
to lose any data from, but if you're consuming from a stream with a large 
retention and where it is fine to start consuming from "now",
+or more generally started from that is defined in 
`ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible 
before. 
+
+This behaviour can now be enabled by setting the 
`ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to 
true, which will make ALL new streams "reset" to consume from the initial 
position
+instead of starting from the beginning. 
+
+If you just want to force a particular new stream to start consuming from the 
defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the 
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property 
(described below) instead.
+
+### Resetting specific streams to the starting position
+
+One of the features of the Flink Kinesis Consumer is that it keeps track of 
the offset that the application is at for each shard, so that if the 
application is restarted we can start consuming from that offset
+when restoring from snapshot. 
+
+This is the ideal behaviour most of the time, but what if you want to jump to 
`LATEST` or go back to `TRIM_HORIZON` for a stream that is already being 
tracked by the Flink Kinesis Consumer? 
+
+You can now do this via the 
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, 
which expects a comma separated list of strings referring to the names of the 
Kinesis Streams to reset.
+
+For example, if you configure your application with
+```
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

Review Comment:
   This was not possible before and is still not possible now 😅 
   
   Even though it would be possible to do that with minimal changes, I feel 
like that is another feature request on its own and probably doesn't belong in 
this one.
   The main goal of this change is: **allow users to say when INITIAL position 
should be used regardless of the stored state.**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to