davidradl commented on code in PR #26154:
URL: https://github.com/apache/flink/pull/26154#discussion_r1954518842


##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -514,3 +512,120 @@ savepointWriter
         OperatorIdentifier.forUid("new-uid"))
     ...
 ```
+
+## Table API
+
+### Getting started
+
+Before getting started with state SQL connector, make sure to review our 
[Flink SQL](../development/sql.md) guidelines.
+
+IMPORTANT NOTE: State Table API is now only supports keyed state.
+
+### Keyed State
+
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-state), or partitioned state, is any state that is partitioned 
relative to a key.
+
+The SQL connector allows users to read arbitrary columns as ValueState and 
complex state types such as ListState, MapState.
+This means if an operator contains a stateful process function such as:
+```java
+eventStream
+  .keyBy(e -> (Integer)e.key)
+  .process(new StatefulFunction())
+  .uid("my-uid");
+
+...
+
+public class Account {
+    private Integer id;
+    public Double amount;
+
+    public Integer geId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+}
+
+public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, 
Void> {
+  private ValueState<Integer> myValueState;
+  private ValueState<Account> myAccountValueState;
+  private ListState<Integer> myListState;
+  private MapState<Integer, Integer> myMapState;
+
+  @Override
+  public void open(OpenContext openContext) {
+    myValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyValueState", Integer.class));
+    myAccountValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyAccountValueState", Account.class));
+    myValueState = getRuntimeContext().getListState(new 
ListStateDescriptor<>("MyListState", Integer.class));
+    myMapState = getRuntimeContext().getMapState(new 
MapStateDescriptor<>("MyMapState", Integer.class, Integer.class));
+  }
+  ...
+}
+```
+
+Then it can read by using the following SQL statement:
+```SQL
+CREATE TABLE state_table (
+  k INTEGER,
+  MyValueState INTEGER,
+  MyAccountValueState ROW<id INTEGER, amount DOUBLE>,
+  MyListState ARRAY<INTEGER>,
+  MyMapState MAP<INTEGER, INTEGER>,
+  PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+  'connector' = 'savepoint',
+  'state.backend.type' = 'rocksdb',
+  'state.path' = '/root/dir/of/checkpoint-data/chk-1',
+  'operator.uid' = 'my-uid'
+);
+```
+
+### Connector options
+
+#### General options
+| Option             | Required | Default | Type                               
           | Description |
+|--------------------|----------|---------|-----------------------------------------------|-------------|
+| connector          | required | (none)  | String                             
           | Specify what connector to use, here should be 'state'. |
+| state.backend.type | required | (none)  | Enum Possible values: hashmap, 
rocksdb, forst | Defines the state backend which must be used for state 
reading. This must match with the value which was defined in Flink job which 
created the savepoint or checkpoint. |
+| state.path         | required | (none)  | String                             
           | Defines the state path which must be used for state reading. All 
filesystem which are supported by Flink can be used here. |
+| operator.uid       | optional | (none)  | String                             
           | Defines the operator UID which must be used for state reading 
(Can't be used together with UID hash). |
+| operator.uid.hash  | optional | (none)  | String                             
           | Defines the operator UID hash which must be used for state reading 
(Can't be used together with UID). |
+
+#### Connector options for column ‘#’
+| Option                  | Required | Default | Type   | Description |
+|-------------------------|----------|---------|--------|-------------|
+| fields.#.state-name     | optional | (none)  | String | Overrides the state 
name which must be used for state reading. This can be useful when the state 
name contains characters which are not compliant with SQL column names. |
+| fields.#.state-type     | optional | (none)  | Enum Possible values: list, 
map, value | Defines the state type which must be used for state reading, 
including value, list and map. When It's not provided then it tries to be 
inferred from the SQL type (ARRAY=list, MAP=map, all others=value). |

Review Comment:
   nit : tries to be inferred -> tries to infer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to