code-hard-play-harder commented on code in PR #140: URL: https://github.com/apache/flink-connector-aws/pull/140#discussion_r1669689616
########## docs/content/docs/connectors/datastream/kinesis.md: ########## @@ -217,6 +217,38 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). +### Configuring starting position for new streams + +By default, the Flink Kinesis Consumer handles new streams the same way it handles a new shard for an existing stream, and it starts consuming from the earliest record (same behaviour as TRIM_HORIZON). + +This behaviour is fine if you're consuming from a stream that you don't want to lose any data from, but if you're consuming from a stream with a large retention and where it is fine to start consuming from "now", +or more generally started from that is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible before. + +This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true, which will make ALL new streams "reset" to consume from the initial position +instead of starting from the beginning. + +If you just want to force a particular new stream to start consuming from the defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property (described below) instead. + +### Resetting specific streams to the starting position + +One of the features of the Flink Kinesis Consumer is that it keeps track of the offset that the application is at for each shard, so that if the application is restarted we can start consuming from that offset +when restoring from snapshot. + +This is the ideal behaviour most of the time, but what if you want to jump to `LATEST` or go back to `TRIM_HORIZON` for a stream that is already being tracked by the Flink Kinesis Consumer? + +You can now do this via the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, which expects a comma separated list of strings referring to the names of the Kinesis Streams to reset. + +For example, if you configure your application with +``` +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); Review Comment: I was wondering if we are able to set different streams with different INITIAL POSITION. Let's say we would add `streamA`, `streamB` and `streamC` as new streams, I want to have `streamA` and `streamB` to consume from `LATEST` and `streamC` from `AT_TIMESTAMP`. Is this possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org