gaborgsomogyi commented on code in PR #26154: URL: https://github.com/apache/flink/pull/26154#discussion_r1954831185
########## docs/content/docs/libs/state_processor_api.md: ########## @@ -514,3 +512,120 @@ savepointWriter OperatorIdentifier.forUid("new-uid")) ... ``` + +## Table API + +### Getting started + +Before getting started with state SQL connector, make sure to review our [Flink SQL](../development/sql.md) guidelines. + +IMPORTANT NOTE: State Table API is now only supports keyed state. + +### Keyed State + +[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), or partitioned state, is any state that is partitioned relative to a key. + +The SQL connector allows users to read arbitrary columns as ValueState and complex state types such as ListState, MapState. +This means if an operator contains a stateful process function such as: +```java +eventStream + .keyBy(e -> (Integer)e.key) + .process(new StatefulFunction()) + .uid("my-uid"); + +... + +public class Account { + private Integer id; + public Double amount; + + public Integer geId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } +} + +public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, Void> { + private ValueState<Integer> myValueState; + private ValueState<Account> myAccountValueState; + private ListState<Integer> myListState; + private MapState<Integer, Integer> myMapState; + + @Override + public void open(OpenContext openContext) { + myValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("MyValueState", Integer.class)); + myAccountValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("MyAccountValueState", Account.class)); + myValueState = getRuntimeContext().getListState(new ListStateDescriptor<>("MyListState", Integer.class)); + myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("MyMapState", Integer.class, Integer.class)); + } + ... +} +``` + +Then it can read by using the following SQL statement: +```SQL +CREATE TABLE state_table ( + k INTEGER, + MyValueState INTEGER, + MyAccountValueState ROW<id INTEGER, amount DOUBLE>, + MyListState ARRAY<INTEGER>, + MyMapState MAP<INTEGER, INTEGER>, + PRIMARY KEY (k) NOT ENFORCED +) WITH ( + 'connector' = 'savepoint', + 'state.backend.type' = 'rocksdb', + 'state.path' = '/root/dir/of/checkpoint-data/chk-1', + 'operator.uid' = 'my-uid' +); +``` + +### Connector options + +#### General options +| Option | Required | Default | Type | Description | +|--------------------|----------|---------|-----------------------------------------------|-------------| +| connector | required | (none) | String | Specify what connector to use, here should be 'state'. | +| state.backend.type | required | (none) | Enum Possible values: hashmap, rocksdb, forst | Defines the state backend which must be used for state reading. This must match with the value which was defined in Flink job which created the savepoint or checkpoint. | +| state.path | required | (none) | String | Defines the state path which must be used for state reading. All filesystem which are supported by Flink can be used here. | +| operator.uid | optional | (none) | String | Defines the operator UID which must be used for state reading (Can't be used together with UID hash). | +| operator.uid.hash | optional | (none) | String | Defines the operator UID hash which must be used for state reading (Can't be used together with UID). | + +#### Connector options for column ‘#’ +| Option | Required | Default | Type | Description | +|-------------------------|----------|---------|--------|-------------| +| fields.#.state-name | optional | (none) | String | Overrides the state name which must be used for state reading. This can be useful when the state name contains characters which are not compliant with SQL column names. | +| fields.#.state-type | optional | (none) | Enum Possible values: list, map, value | Defines the state type which must be used for state reading, including value, list and map. When It's not provided then it tries to be inferred from the SQL type (ARRAY=list, MAP=map, all others=value). | +| fields.#.map-key-format | optional | (none) | String | Defines the format class scheme for decoding map value key data (for ex. java.lang.Long). When It's not provided then it tries to be inferred from the SQL type (only primitive types supported). | +| fields.#.value-format | optional | (none) | String | Defines the format class scheme for decoding value data (for ex. java.lang.Long). When It's not provided then it tries to be inferred from the SQL type (only primitive types supported). | + +### Default Data Type Mapping + +The state SQL connector is inferring the data type for primitive types when `fields.#.value-format` and `fields.#.map-key-format` +are not defined. The following table shows the `Flink SQL type` -> `Java type` default mapping. If the mapping is not calculated properly +then it can be overridden with the two mentioned config parameters on a per column bases. + +| Flink SQL type | Java type | +|-------------------------|-------------------------------------------------------------------------| +| CHAR / VARCHAR / STRING | java.lang.String | +| BOOLEAN | boolean | Review Comment: boolean and Boolean can also be used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org