gaborgsomogyi commented on code in PR #26154:
URL: https://github.com/apache/flink/pull/26154#discussion_r1954817814


##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -514,3 +512,120 @@ savepointWriter
         OperatorIdentifier.forUid("new-uid"))
     ...
 ```
+
+## Table API
+
+### Getting started
+
+Before getting started with state SQL connector, make sure to review our 
[Flink SQL](../development/sql.md) guidelines.
+
+IMPORTANT NOTE: State Table API is now only supports keyed state.
+
+### Keyed State
+
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-state), or partitioned state, is any state that is partitioned 
relative to a key.
+
+The SQL connector allows users to read arbitrary columns as ValueState and 
complex state types such as ListState, MapState.
+This means if an operator contains a stateful process function such as:
+```java
+eventStream
+  .keyBy(e -> (Integer)e.key)
+  .process(new StatefulFunction())
+  .uid("my-uid");
+
+...
+
+public class Account {
+    private Integer id;
+    public Double amount;
+
+    public Integer geId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+}
+
+public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, 
Void> {
+  private ValueState<Integer> myValueState;
+  private ValueState<Account> myAccountValueState;
+  private ListState<Integer> myListState;
+  private MapState<Integer, Integer> myMapState;
+
+  @Override
+  public void open(OpenContext openContext) {
+    myValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyValueState", Integer.class));
+    myAccountValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyAccountValueState", Account.class));
+    myValueState = getRuntimeContext().getListState(new 
ListStateDescriptor<>("MyListState", Integer.class));
+    myMapState = getRuntimeContext().getMapState(new 
MapStateDescriptor<>("MyMapState", Integer.class, Integer.class));
+  }
+  ...
+}
+```
+
+Then it can read by using the following SQL statement:
+```SQL
+CREATE TABLE state_table (
+  k INTEGER,
+  MyValueState INTEGER,
+  MyAccountValueState ROW<id INTEGER, amount DOUBLE>,
+  MyListState ARRAY<INTEGER>,
+  MyMapState MAP<INTEGER, INTEGER>,
+  PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+  'connector' = 'savepoint',
+  'state.backend.type' = 'rocksdb',
+  'state.path' = '/root/dir/of/checkpoint-data/chk-1',
+  'operator.uid' = 'my-uid'
+);
+```
+
+### Connector options
+
+#### General options
+| Option             | Required | Default | Type                               
           | Description |
+|--------------------|----------|---------|-----------------------------------------------|-------------|
+| connector          | required | (none)  | String                             
           | Specify what connector to use, here should be 'state'. |
+| state.backend.type | required | (none)  | Enum Possible values: hashmap, 
rocksdb, forst | Defines the state backend which must be used for state 
reading. This must match with the value which was defined in Flink job which 
created the savepoint or checkpoint. |
+| state.path         | required | (none)  | String                             
           | Defines the state path which must be used for state reading. All 
filesystem which are supported by Flink can be used here. |
+| operator.uid       | optional | (none)  | String                             
           | Defines the operator UID which must be used for state reading 
(Can't be used together with UID hash). |

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to