xintongsong commented on code in PR #26057:
URL: https://github.com/apache/flink/pull/26057#discussion_r1926890754


##########
docs/content/docs/dev/datastream-v2/context_and_state_processing.md:
##########
@@ -0,0 +1,305 @@
+---
+title: Context and State Processing (to be update)
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+{{< hint warning >}}
+**Note:** DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+{{< /hint >}}
+
+# Context and State Processing
+
+## Context
+
+Unlike attributes like the name of process operation, some information(such as 
current key) can only be obtained when the process function is executed.
+In order to build a bridge between process functions and the execution engine, 
DataStream API provide a unified entrypoint called Runtime Context.
+
+We divide all contextual into multiple parts according to their functions:
+
+- JobInfo: Hold all job related information, such as job name, execution mode.
+
+- TaskInfo: Hold all task related information, such as parallelism.
+
+- MetricGroup: Manage context related to metrics, such as registering a metric.
+
+- State Manager: Manage context related to state, such as accessing a specific 
state.
+
+- Watermark Manager: Manage context related to watermark, such as triggering a 
watermark.
+
+- ProcessingTime Manager: Manage context related to processing timer, such as 
getting the current processing time.
+
+Those context information can be classified into two categories:
+
+* NonPartitionedContext: Include JobInfo, TaskInfo, MetricGroup, 
WatermarkManager.
+* PartitionedContext: Include StateManager, ProcessingTimeManager.
+
+For most methods in `ProcessFunction`, NonPartitionedContext and 
PartitionedContext are passed in,
+depending on whether this method can derive the current partition.
+
+For example, `processRecord`, `onProcessingTimer`, all have clear partition 
information, so it will receive a `PartitionedContext` as the argument.
+On the other hand, `open`, `endInput`, can only have NonPartitionedContext.
+
+The following code snippet shows how to get the parallelism and execution mode 
of the process function:
+
+```java
+new OneInputStreamProcessFunction<String, String>(){
+    private transient int parallelism;
+    
+    private transient ExecutionMode executionMode;
+    @Override
+    public void open(NonPartitionedContext<String> ctx) throws Exception {
+        parallelism = ctx.getTaskInfo().getParallelism();
+        executionMode = ctx.getJobInfo().getExecutionMode();
+    }
+}
+```
+
+## State Processing
+
+State is the foundation of stateful computation. DataStream API supports 
declaring state in Process Function and
+accessing and updating state during data processing.
+
+In general, you should follow the principle of "declare first, use later.". In 
general,
+writing a stateful process function is divided into three steps:
+
+1. Defining the state in the form of `StateDeclaration`.
+2. Declaring the state in `ProcessFunction#usesStates`.
+3. Getting and updating state via `StateManager`.
+
+Before we go any further, let's look at what a stateful process function looks 
like:
+
+```java
+private static class StatefulFunction implements 
OneInputStreamProcessFunction<Long, Long> {
+    // Step1: Defining the state in the form of `StateDeclaration`.
+    static final StateDeclaration.ListStateDeclaration<Long> 
LIST_STATE_DECLARATION =
+            StateDeclarations.listStateBuilder("example-list-state", 
TypeDescriptors.LONG).build();
+     
+    // Step2: Declaring the state in `ProcessFunction#usesStates`
+    @Override
+    public Set<StateDeclaration> usesStates() {
+        return Collections.singleton(LIST_STATE_DECLARATION);
+    }
+    
+    @Override
+    public void processRecord(Long record, Collector<Long> output, 
RuntimeContext ctx)
+            throws Exception {
+        // Step3: Getting and updating state via `StateManager`.
+        Optional<ListState<Long>> stateOptional =
+                ctx.getStateManager().getState(LIST_STATE_DECLARATION);
+        ListState<Long> state = stateOptional.get();
+        // do something with this state. For example, update the state by this 
record.
+        state.update(Collections.singletonList(record));
+    }
+}
+```
+
+### Define State
+
+StateDeclaration is used to define a specific state, two types of information 
need to be provided for a state declaration:
+
+- Name: Used as the unique identifier of the state.
+
+- RedistributionMode: Defines how the state is redistributed between different 
partitions. For the keyed partition stream,
+since the state is bounded within the partition, no redistribution is 
required. But for the non-keyed partition stream,
+the partition will change with the parallelism, so the state must define how 
to redistribute.
+
+There are three RedistributionMode to choose:
+
+```java
+/**
+ * {@link RedistributionMode} is used to indicate whether this state supports 
redistribution
+ * between partitions and how to redistribute this state during rescaling.
+ */
+enum RedistributionMode {
+    /**
+     * Not supports redistribution.
+     *
+     * <p>For example : KeyedState is bind to a specific keyGroup, so it is 
can't support
+     * redistribution between partitions.
+     */
+    NONE,
+
+    /**
+     * This state can be safely redistributed between different partitions, 
and the specific
+     * redistribution strategy is determined by the state itself.
+     */
+    REDISTRIBUTABLE,
+
+    /** States are guaranteed to be identical in different partitions, thus 
redistribution is not a problem. */
+    IDENTICAL
+}
+```

Review Comment:
   Shouldn't use code snippet



##########
docs/content/docs/dev/datastream-v2/context_and_state_processing.md:
##########
@@ -0,0 +1,305 @@
+---
+title: Context and State Processing (to be update)
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+{{< hint warning >}}
+**Note:** DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+{{< /hint >}}
+
+# Context and State Processing
+
+## Context
+
+Unlike attributes like the name of process operation, some information(such as 
current key) can only be obtained when the process function is executed.
+In order to build a bridge between process functions and the execution engine, 
DataStream API provide a unified entrypoint called Runtime Context.
+
+We divide all contextual into multiple parts according to their functions:
+
+- JobInfo: Hold all job related information, such as job name, execution mode.
+
+- TaskInfo: Hold all task related information, such as parallelism.
+
+- MetricGroup: Manage context related to metrics, such as registering a metric.
+
+- State Manager: Manage context related to state, such as accessing a specific 
state.
+
+- Watermark Manager: Manage context related to watermark, such as triggering a 
watermark.
+
+- ProcessingTime Manager: Manage context related to processing timer, such as 
getting the current processing time.
+
+Those context information can be classified into two categories:
+
+* NonPartitionedContext: Include JobInfo, TaskInfo, MetricGroup, 
WatermarkManager.
+* PartitionedContext: Include StateManager, ProcessingTimeManager.
+
+For most methods in `ProcessFunction`, NonPartitionedContext and 
PartitionedContext are passed in,
+depending on whether this method can derive the current partition.
+
+For example, `processRecord`, `onProcessingTimer`, all have clear partition 
information, so it will receive a `PartitionedContext` as the argument.
+On the other hand, `open`, `endInput`, can only have NonPartitionedContext.

Review Comment:
   Typically, `PartitionedContext` is provided to a `ProcessFunction` when 
processing of data within a specific partition is expected. E.g., upon 
receiving of new records or triggering of timers. On the other hand, 
`NonPartitionedContext` is provided when no specific partition is relevant. 
E.g., initiating or cleaning-up of the process function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to