Tzu-Li (Gordon) Tai created FLINK-15769: -------------------------------------------
Summary: Allow configuring offset startup positions for Stateful Functions Kafka Ingress Key: FLINK-15769 URL: https://issues.apache.org/jira/browse/FLINK-15769 Project: Flink Issue Type: New Feature Components: Stateful Functions Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai It is quite typical that a user is capable of setting where to start consuming a Kafka topic. Since the Stateful Functions Kafka ingress sits on top of Flink's Kafka consumer, there is already various options to start with: * {{GROUP_OFFSETS}} (default): start with whatever offsets were committed to Kafka for given {{group.id}} * {{LATEST}}: start from latest record in topic * {{EARLIEST}}: start from earliest record in topic * {{SPECIFIC_OFFSETS}}: provide a map of topic partition -> offset. This is particularly important for bootstrapped state scenarios, where the user would want to start from a specific position consistent with the state bootstrapped in their functions. * {{TIMESTAMP}}: start from offsets written starting from the given timestamp. The proposed API looks like so: {code} KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...) .withTopic(...) .withDeserializer(...) .withDefaultStartPosition(KafkaIngressStartPosition.fromEarliest()/fromLatest()) .withSpecificStartOffsets(KafkaIngressStartOffsets.fromMap(Map)/fromTimestamp(Long)) {code} The {{withDefaultStartPosition}} method is straightforward. The reason to separate this from another {{withSpecificStartOffsets}} method is that there would be cases where some partition does not contain the offsets specified by {{withSpecificStartOffsets}}. In this case, the ingress would need to fallback to some default configuration; this would be the {{withDefaultStartPosition}} configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)