gaborgsomogyi commented on code in PR #26154: URL: https://github.com/apache/flink/pull/26154#discussion_r1954824761
########## docs/content/docs/libs/state_processor_api.md: ########## @@ -514,3 +512,120 @@ savepointWriter OperatorIdentifier.forUid("new-uid")) ... ``` + +## Table API + +### Getting started + +Before getting started with state SQL connector, make sure to review our [Flink SQL](../development/sql.md) guidelines. + +IMPORTANT NOTE: State Table API is now only supports keyed state. + +### Keyed State + +[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), or partitioned state, is any state that is partitioned relative to a key. + +The SQL connector allows users to read arbitrary columns as ValueState and complex state types such as ListState, MapState. +This means if an operator contains a stateful process function such as: +```java +eventStream + .keyBy(e -> (Integer)e.key) + .process(new StatefulFunction()) + .uid("my-uid"); + +... + +public class Account { + private Integer id; + public Double amount; + + public Integer geId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } +} + +public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, Void> { + private ValueState<Integer> myValueState; + private ValueState<Account> myAccountValueState; + private ListState<Integer> myListState; + private MapState<Integer, Integer> myMapState; + + @Override + public void open(OpenContext openContext) { + myValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("MyValueState", Integer.class)); + myAccountValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("MyAccountValueState", Account.class)); + myValueState = getRuntimeContext().getListState(new ListStateDescriptor<>("MyListState", Integer.class)); + myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("MyMapState", Integer.class, Integer.class)); + } + ... +} +``` + +Then it can read by using the following SQL statement: +```SQL +CREATE TABLE state_table ( + k INTEGER, + MyValueState INTEGER, + MyAccountValueState ROW<id INTEGER, amount DOUBLE>, + MyListState ARRAY<INTEGER>, + MyMapState MAP<INTEGER, INTEGER>, + PRIMARY KEY (k) NOT ENFORCED +) WITH ( + 'connector' = 'savepoint', + 'state.backend.type' = 'rocksdb', + 'state.path' = '/root/dir/of/checkpoint-data/chk-1', + 'operator.uid' = 'my-uid' +); +``` + +### Connector options + +#### General options +| Option | Required | Default | Type | Description | +|--------------------|----------|---------|-----------------------------------------------|-------------| +| connector | required | (none) | String | Specify what connector to use, here should be 'state'. | +| state.backend.type | required | (none) | Enum Possible values: hashmap, rocksdb, forst | Defines the state backend which must be used for state reading. This must match with the value which was defined in Flink job which created the savepoint or checkpoint. | +| state.path | required | (none) | String | Defines the state path which must be used for state reading. All filesystem which are supported by Flink can be used here. | +| operator.uid | optional | (none) | String | Defines the operator UID which must be used for state reading (Can't be used together with UID hash). | +| operator.uid.hash | optional | (none) | String | Defines the operator UID hash which must be used for state reading (Can't be used together with UID). | + +#### Connector options for column ‘#’ Review Comment: This is a generic pattern across SQL connectors without deep explanation. See datagen... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org