Zakelly commented on code in PR #26154:
URL: https://github.com/apache/flink/pull/26154#discussion_r1955530219


##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -514,3 +512,119 @@ savepointWriter
         OperatorIdentifier.forUid("new-uid"))
     ...
 ```
+
+## Table API
+
+### Getting started
+
+Before you interrogate state using the table API, make sure to review our 
[Flink SQL](../development/sql.md) guidelines.
+
+IMPORTANT NOTE: State Table API only supports keyed state.
+
+### Keyed State
+
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-state), also known as partitioned state, is any state that is 
partitioned relative to a key.
+
+The SQL connector allows users to read arbitrary columns as ValueState and 
complex state types such as ListState, MapState.
+This means if an operator contains a stateful process function such as:
+```java
+eventStream
+  .keyBy(e -> (Integer)e.key)
+  .process(new StatefulFunction())
+  .uid("my-uid");
+
+...
+
+public class Account {
+    private Integer id;
+    public Double amount;
+
+    public Integer geId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+}
+
+public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, 
Void> {
+  private ValueState<Integer> myValueState;
+  private ValueState<Account> myAccountValueState;
+  private ListState<Integer> myListState;
+  private MapState<Integer, Integer> myMapState;
+
+  @Override
+  public void open(OpenContext openContext) {
+    myValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyValueState", Integer.class));
+    myAccountValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyAccountValueState", Account.class));
+    myValueState = getRuntimeContext().getListState(new 
ListStateDescriptor<>("MyListState", Integer.class));
+    myMapState = getRuntimeContext().getMapState(new 
MapStateDescriptor<>("MyMapState", Integer.class, Integer.class));
+  }
+  ...
+}
+```
+
+Then it can read by querying a table created using the following SQL statement:
+```SQL
+CREATE TABLE state_table (
+  k INTEGER,
+  MyValueState INTEGER,
+  MyAccountValueState ROW<id INTEGER, amount DOUBLE>,
+  MyListState ARRAY<INTEGER>,
+  MyMapState MAP<INTEGER, INTEGER>,
+  PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+  'connector' = 'savepoint',
+  'state.backend.type' = 'rocksdb',
+  'state.path' = '/root/dir/of/checkpoint-data/chk-1',
+  'operator.uid' = 'my-uid'
+);
+```
+
+### Connector options
+
+#### General options
+| Option             | Required | Default | Type                               
           | Description                                                        
                                                                                
                                               |
+|--------------------|----------|---------|-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connector          | required | (none)  | String                             
           | Specify what connector to use, here should be 'state'.             
                                                                                
                                               |
+| state.backend.type | required | (none)  | Enum Possible values: hashmap, 
rocksdb, forst | Defines the state backend which must be used for state 
reading. This must match with the value which was defined in Flink job which 
created the savepoint or checkpoint.                          |
+| state.path         | required | (none)  | String                             
           | Defines the state path which must be used for state reading. All 
file system that are supported by Flink can be used here.                       
                                                 |
+| operator.uid       | optional | (none)  | String                             
           | Defines the operator UID which must be used for state reading 
(can't be used together with `operator.uid.hash`).                              
                                                    |
+| operator.uid.hash  | optional | (none)  | String                             
           | Defines the operator UID hash which must be used for state reading 
(can't be used together with `operator.uid`).                                   
                                               |

Review Comment:
   Additional note that either `operator.uid` or `operator.uid.hash` must be 
specified?



##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -514,3 +512,119 @@ savepointWriter
         OperatorIdentifier.forUid("new-uid"))
     ...
 ```
+
+## Table API
+
+### Getting started
+
+Before you interrogate state using the table API, make sure to review our 
[Flink SQL](../development/sql.md) guidelines.
+
+IMPORTANT NOTE: State Table API only supports keyed state.
+
+### Keyed State
+
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-state), also known as partitioned state, is any state that is 
partitioned relative to a key.
+
+The SQL connector allows users to read arbitrary columns as ValueState and 
complex state types such as ListState, MapState.
+This means if an operator contains a stateful process function such as:
+```java
+eventStream
+  .keyBy(e -> (Integer)e.key)
+  .process(new StatefulFunction())
+  .uid("my-uid");
+
+...
+
+public class Account {
+    private Integer id;
+    public Double amount;
+
+    public Integer geId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+}
+
+public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, 
Void> {
+  private ValueState<Integer> myValueState;
+  private ValueState<Account> myAccountValueState;
+  private ListState<Integer> myListState;
+  private MapState<Integer, Integer> myMapState;
+
+  @Override
+  public void open(OpenContext openContext) {
+    myValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyValueState", Integer.class));
+    myAccountValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyAccountValueState", Account.class));
+    myValueState = getRuntimeContext().getListState(new 
ListStateDescriptor<>("MyListState", Integer.class));
+    myMapState = getRuntimeContext().getMapState(new 
MapStateDescriptor<>("MyMapState", Integer.class, Integer.class));
+  }
+  ...
+}
+```
+
+Then it can read by querying a table created using the following SQL statement:
+```SQL
+CREATE TABLE state_table (
+  k INTEGER,
+  MyValueState INTEGER,
+  MyAccountValueState ROW<id INTEGER, amount DOUBLE>,
+  MyListState ARRAY<INTEGER>,
+  MyMapState MAP<INTEGER, INTEGER>,
+  PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+  'connector' = 'savepoint',
+  'state.backend.type' = 'rocksdb',
+  'state.path' = '/root/dir/of/checkpoint-data/chk-1',
+  'operator.uid' = 'my-uid'
+);
+```
+
+### Connector options
+
+#### General options
+| Option             | Required | Default | Type                               
           | Description                                                        
                                                                                
                                               |
+|--------------------|----------|---------|-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connector          | required | (none)  | String                             
           | Specify what connector to use, here should be 'state'.             
                                                                                
                                               |

Review Comment:
   ```suggestion
   | connector          | required | (none)  | String                           
             | Specify what connector to use, here should be 'savepoint'.       
                                                                                
                                                     |
   ```



##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -514,3 +512,119 @@ savepointWriter
         OperatorIdentifier.forUid("new-uid"))
     ...
 ```
+
+## Table API
+
+### Getting started
+
+Before you interrogate state using the table API, make sure to review our 
[Flink SQL](../development/sql.md) guidelines.

Review Comment:
   Seems invalid link?
   ```suggestion
   Before you interrogate state using the table API, make sure to review our 
[Flink SQL](({{< ref "docs/dev/table/overview" >}})) guidelines.
   ```
   OR
   ```suggestion
   Before you interrogate state using the table API, make sure to review our 
[Flink SQL](({{< ref "docs/dev/table/sql/overview" >}})) guidelines.
   ```



##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -514,3 +512,119 @@ savepointWriter
         OperatorIdentifier.forUid("new-uid"))
     ...
 ```
+
+## Table API
+
+### Getting started
+
+Before you interrogate state using the table API, make sure to review our 
[Flink SQL](../development/sql.md) guidelines.
+
+IMPORTANT NOTE: State Table API only supports keyed state.
+
+### Keyed State
+
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-state), also known as partitioned state, is any state that is 
partitioned relative to a key.
+
+The SQL connector allows users to read arbitrary columns as ValueState and 
complex state types such as ListState, MapState.
+This means if an operator contains a stateful process function such as:
+```java
+eventStream
+  .keyBy(e -> (Integer)e.key)
+  .process(new StatefulFunction())
+  .uid("my-uid");
+
+...
+
+public class Account {
+    private Integer id;
+    public Double amount;
+
+    public Integer geId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+}
+
+public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, 
Void> {
+  private ValueState<Integer> myValueState;
+  private ValueState<Account> myAccountValueState;
+  private ListState<Integer> myListState;
+  private MapState<Integer, Integer> myMapState;
+
+  @Override
+  public void open(OpenContext openContext) {
+    myValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyValueState", Integer.class));
+    myAccountValueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("MyAccountValueState", Account.class));
+    myValueState = getRuntimeContext().getListState(new 
ListStateDescriptor<>("MyListState", Integer.class));
+    myMapState = getRuntimeContext().getMapState(new 
MapStateDescriptor<>("MyMapState", Integer.class, Integer.class));
+  }
+  ...
+}
+```
+
+Then it can read by querying a table created using the following SQL statement:
+```SQL
+CREATE TABLE state_table (
+  k INTEGER,
+  MyValueState INTEGER,
+  MyAccountValueState ROW<id INTEGER, amount DOUBLE>,
+  MyListState ARRAY<INTEGER>,
+  MyMapState MAP<INTEGER, INTEGER>,
+  PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+  'connector' = 'savepoint',
+  'state.backend.type' = 'rocksdb',
+  'state.path' = '/root/dir/of/checkpoint-data/chk-1',
+  'operator.uid' = 'my-uid'
+);
+```
+
+### Connector options
+
+#### General options
+| Option             | Required | Default | Type                               
           | Description                                                        
                                                                                
                                               |
+|--------------------|----------|---------|-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connector          | required | (none)  | String                             
           | Specify what connector to use, here should be 'state'.             
                                                                                
                                               |
+| state.backend.type | required | (none)  | Enum Possible values: hashmap, 
rocksdb, forst | Defines the state backend which must be used for state 
reading. This must match with the value which was defined in Flink job which 
created the savepoint or checkpoint.                          |

Review Comment:
   IIUC, `forst` will not be supported if 
https://github.com/apache/flink/pull/26134/files#diff-c6d7f03b3d79d772c126e131a60d031f19a8fb3ac0a68d6cda81dfd2d216ff7cR388
 is merged, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to