xintongsong commented on code in PR #26057:
URL: https://github.com/apache/flink/pull/26057#discussion_r1926349435


##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.

Review Comment:
   We should highlight this with a warning block for all the DSv2 docs.
   See FGRM for example.
   
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.
+
+### Fundamental primitives
+
+Fundamental primitives are the basic and necessary semantics that flink need 
to provide in order to
+define a stateful stream processing application, which cannot be achieved by 
users if not provided by
+the framework. It includes dataStream, partitioning, process function, state, 
processing timer service,
+watermark and async processing(not provided for now).

Review Comment:
   ```suggestion
   watermark and async processing (not yet available).
   ```



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.

Review Comment:
   ```suggestion
   into two categories: fundamental primitives and high-level extensions.
   ```



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.
+
+### Fundamental primitives
+
+Fundamental primitives are the basic and necessary semantics that flink need 
to provide in order to
+define a stateful stream processing application, which cannot be achieved by 
users if not provided by
+the framework. It includes dataStream, partitioning, process function, state, 
processing timer service,
+watermark and async processing(not provided for now).
+
+More details can be found in:
+- [Building Blocks]({{< ref "docs/dev/datastream-v2/building_blocks" >}}): 
Given the most basic elements of DataStream API.
+- [State Processing]({{< ref 
"docs/dev/datastream-v2/context_and_state_processing" >}}): Explanation of how 
to develop stateful applications.
+- [Time Processing # Processing Timer Service]({{< ref 
"docs/dev/datastream-v2/time-processing/processing_timer_service" >}}): 
Explanation of how to handle processing time.
+- [Watermark]({{< ref "docs/dev/datastream-v2/watermark/define_watermark" 
>}}): Explanation of how to define and handle user defined events.
+
+### High-Level Extensions
+
+High-Level extensions are like short-cuts / sugars, without which users can 
probably still achieve the same
+behavior by working with the fundamental APIs, but would be a lot easier with 
the builtin supports. 
+It includes common built-in functions(e.g. map, filter, reduce, etc. not 
provided for now), event timer service, window and join.

Review Comment:
   ```suggestion
   It includes common built-in functions (e.g. map, filter, reduce, etc. not 
provided for now), event timer service, window and join.
   ```



##########
docs/content/docs/dev/datastream-v2/time-processing/event_timer_service.md:
##########
@@ -0,0 +1,427 @@
+---
+title: "Event Timer Service"
+weight: 9
+type: docs
+aliases:
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Event Timer Service
+
+The event timer service is a high-level extension of the Flink DataStream API, 
provided by Flink. 
+It enables users to register timers for executing calculations at specific 
event time points and helps determine when to trigger windows within the Flink 
framework.
+
+For a comprehensive explanation of event time, please refer to the section on 
[Notions of Time: Event Time and Processing Time]({{< ref "docs/concepts/time" 
>}}#notions-of-time-event-time-and-processing-time).
+
+In this section, we will introduce how to utilize the event timer service 
within the Flink DataStream API.
+
+We use a special type of [Watermark]({{< ref 
"docs/dev/datastream-v2/watermark" >}}) to denote the progression of event time 
in the stream, 
+which we refer to as the event time watermark. Additionally, to dealing idle 
inputs or sources, we implement another type of 
+[Watermark]({{< ref "docs/dev/datastream-v2/watermark" >}}) to indicate that 
the input or source 
+is idle. 
+This is known as the idle status watermark. For more information, please refer 
to [Dealing With Idle Inputs / Sources](#dealing-with-idle-inputs--sources). 
+In this document, we collectively refer to the event time watermark and the 
idle status watermark as event-time related watermarks.
+
+The core of the event timer service lies in generating and propagating 
event-time related watermarks through the streams. 
+To achieve this, we need to address two aspects:
+1. How to generate event-time related watermarks.
+2. How to handle event-time related watermarks.
+
+The following will introduce these two aspects.
+
+## Generate Event-Time related Watermarks
+
+In order to work with *event time*, Flink needs to know the events
+*timestamps*, meaning each element in the stream needs to have its event
+timestamp *assigned*. This is usually done by accessing/extracting the
+timestamp from some field in the element.
+
+Once the timestamps have been extracted, Flink generates event time 
watermarks. 
+There are two methods for doing so: one is to use the 
`EventTimeWatermarkGeneratorBuilder` provided by Flink, 
+and the other is to implement a custom `ProcessFunction`.
+
+Users should select one of these two approaches to suit their needs.
+
+### Generate Watermarks by WatermarkGeneratorBuilder
+
+Users can utilize the `EventTimeWatermarkGeneratorBuilder` to generate 
event-time related watermarks.
+There are four aspects of the `EventTimeWatermarkGeneratorBuilder` that users 
can configure:
+
+1. [Required] `EventTimeExtractor`
+
+   This function instructs Flink on how to extract the event time from each 
record.
+
+3. [Optional] Input Idle Timeout
+
+   If the input stream remains idle for a specified duration, Flink will 
ignore this input when 
+combining event-time related watermarks to prevent stalling the progression of 
event time. The default value for this timeout is 0.
+
+    For more information, please refer to the [Dealing With Idle Inputs / 
Sources](#dealing-with-idle-inputs--sources).
+
+4. [Optional] Out-of-Order Time
+
+    To accommodate the disorder of input records, user can set a maximum 
out-of-order time for the event time watermark. 
+The default value for this setting is also 0.
+
+   For more information, please refer to the [Fixed Amount of 
Lateness](#fixed-amount-of-lateness).
+
+5. [Optional] Generation Frequency
+
+   Flink offers three scenarios for generating event-time related watermarks:
+     - No EventTimeWatermarks are generated and emitted.
+     - EventTimeWatermarks are generated and emitted periodically.
+     - EventTimeWatermarks are generated and emitted for each event.
+
+   By default, Flink adopts the second approach, which involves periodically 
generating and emitting 
+event-time related watermarks. The interval for this periodic generation is 
determined by the configuration 
+"pipeline.auto-watermark-interval."
+
+After configuring and obtaining an instance of the 
`EventTimeWatermarkGeneratorBuilder`, 
+users can utilize it to build a `ProcessFunction`. This `ProcessFunction` will 
extract the event time 
+from each record and generates the corresponding event-time related watermark.
+
+A typical example of using `EventTimeWatermarkGeneratorBuilder` is shown in 
below:
+
+```java
+NonKeyedPartitionStream stream = ...;
+
+EventTimeWatermarkGeneratorBuilder<POJO> builder = EventTimeExtension
+        .newWatermarkGeneratorBuilder(pojo -> pojo.getTimeStamp())  // set 
event time extractor
+        .withIdleness(Duration.ofSeconds(10))           // set input idle 
timeout
+        .withMaxOutOfOrderTime(Duration.ofSeconds(30))  // set max 
out-of-order time
+        .periodicWatermark(Duration.ofMillis(200));      // set periodic 
watermark generation interval
+
+stream.process(builder.buildAsProcessFunction())
+      .process(...);
+```
+
+{{< hint warning >}}
+**Attention**: Both timestamps and event time watermarks
+are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
+{{< /hint >}}
+
+#### Fixed Amount of Lateness
+
+Another example of event time watermark generation is when the event time 
watermark lags

Review Comment:
   Where is the example? How is it different from the above one?



##########
docs/content/docs/dev/datastream-v2/builtin-funcs/joining.md:
##########
@@ -0,0 +1,116 @@
+---
+title: "Joining"
+weight: 9 
+type: docs
+aliases:
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Joining
+
+Join is used to merge two data streams by matching elements from both streams 
based on a common key, 
+and performing calculations on the matched elements. 
+
+This section will introduce the Join operation in DataStream in detail.
+
+Please note that currently, DataStream supports only non-window INNER joins. 
+Other types of joins, such as interval joins, lookup joins, and window joins, 
will be supported in the future.
+
+## Non-Window Join
+
+In non-window Join, Flink first creates two built-in states to store the input 
data, 
+specifically for each of the two input data streams. 
+When data arrives from one side, Flink attempts to match it with the data 
stored in the state 
+of the other side and performs computations on the matching elements.
+
+During this process, users need to define how to compute the matching elements 
using [JoinFunction](#joinfunction),
+then use the [relevant APIs](#apis-for-performing-join) to perform the Join.
+
+Please note that the Join requires both input streams to be KeyedStream.
+
+### JoinFunction

Review Comment:
   The story line is weird. Users first decide which streams to be joined, then 
specify the join key, and will decide the processing logic lastly.



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.
+
+### Fundamental primitives
+
+Fundamental primitives are the basic and necessary semantics that flink need 
to provide in order to
+define a stateful stream processing application, which cannot be achieved by 
users if not provided by
+the framework. It includes dataStream, partitioning, process function, state, 
processing timer service,

Review Comment:
   ```suggestion
   the framework. It includes data stream, partitioning, process function, 
state, processing timer service,
   ```



##########
docs/content/docs/dev/datastream-v2/building_blocks.md:
##########
@@ -0,0 +1,215 @@
+---
+title: Building Blocks
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Building Blocks
+
+DataStream, Partitioning, ProcessFunction are the most fundamental elements of 
DataStream API and respectively represent:
+
+- What are the types of data streams
+
+- How data is partitioned
+
+- How to perform operations / processing on data streams
+
+They are also the core parts of the fundamental primitives provided by 
DataStream API.
+
+## DataStream
+
+Data flows on the stream may be divided into multiple partitions. According to 
how the data is partitioned on the stream, we divide it into the following 
categories:
+
+- Global Stream: Force single partition/parallelism, and the correctness of 
data depends on this.
+
+- Partition Stream:
+  - Divide data into multiple partitions. State is only available within the 
partition. One partition can only be processed by one task, but one task can 
handle one or multiple partitions.
+According to the partitioning approach, it can be further divided into the 
following two categories:
+
+    - Keyed Partition Stream: Each key is a partition, and the partition to 
which the data belongs is deterministic.
+
+    - Non-Keyed Partition Stream: Each parallelism is a partition, and the 
partition to which the data belongs is nondeterministic.
+
+- Broadcast Stream: Each partition contains the same data.
+
+## Partitioning
+
+Above we defined the stream and how it is partitioned. The next topic to 
discuss is how to convert
+between different partition types. We call these transformations partitioning.
+For example non-keyed partition stream can be transformed to keyed partition 
stream via a `KeyBy` partitioning.
+
+```java
+NonKeyedPartitionStream<Tuple<Integer, String>> stream = ...;
+KeyedPartitionStream<Integer, String> keyedStream = stream.keyBy(record -> 
record.f0);
+```
+
+Overall, we have the following four partitioning:
+ 
+- KeyBy: Let all data be repartitioned according to specified key.
+
+- Shuffle: Repartition and shuffle all data.
+
+- Global: Merge all partitions into one. 
+
+- Broadcast: Force partitions broadcast data to downstream.
+
+The specific transformation relationship is shown in the following table:
+
+| Partitioning | Global | Keyed | NonKeyed |      Broadcast      |
+|:------------:|:------:|:-----:|:--------:|:-------------------:|
+|    Global    |   ❎    | KeyBy | Shuffle  |      Broadcast     |
+|    Keyed     | Global | KeyBy | Shuffle  |      Broadcast      |
+|   NonKeyed   | Global | KeyBy | Shuffle  |      Broadcast      |
+|  Broadcast   |   ❎   |  ❎   |    ❎    |          ❎        |
+
+(A crossed box indicates that it is not supported or not required)
+
+One thing to note is: broadcast can only be used in conjunction with other 
inputs and cannot be directly converted to other streams.
+
+## ProcessFunction
+Once we have the data stream, we can apply operations on it. The operations 
that can be performed over
+DataStream are collectively called Process Function. It is the only entrypoint 
for defining all kinds
+of processing on the data streams.
+
+### Classification of ProcessFunction
+According to the number of input / output, they are classified as follows:
+
+|               Partitioning                |    Number of Inputs    |        
Number of Outputs        |
+|:-----------------------------------------:|:----------------------:|:-------------------------------:|
+|    OneInputStreamProcessFunction          |           1            |         
       1                |
+| TwoInputNonBroadcastStreamProcessFunction |           2            |         
       1                |
+|  TwoInputBroadcastStreamProcessFunction   |           2            |         
       1                |
+|      TwoOutputStreamProcessFunction       |           1            |         
       2                |
+
+Logically, process functions that support more inputs and outputs can be 
achieved by combining them, 
+but this implementation might be inefficient. If the call for this becomes 
louder, 
+we will consider supporting as many output edges as we want through a 
mechanism like OutputTag.
+But this loses the explicit generic type information that comes with using 
ProcessFunction.

Review Comment:
   This is not user-facing.



##########
docs/content/docs/dev/datastream-v2/building_blocks.md:
##########
@@ -0,0 +1,215 @@
+---
+title: Building Blocks
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Building Blocks
+
+DataStream, Partitioning, ProcessFunction are the most fundamental elements of 
DataStream API and respectively represent:
+
+- What are the types of data streams
+
+- How data is partitioned
+
+- How to perform operations / processing on data streams
+
+They are also the core parts of the fundamental primitives provided by 
DataStream API.
+
+## DataStream
+
+Data flows on the stream may be divided into multiple partitions. According to 
how the data is partitioned on the stream, we divide it into the following 
categories:
+
+- Global Stream: Force single partition/parallelism, and the correctness of 
data depends on this.
+
+- Partition Stream:
+  - Divide data into multiple partitions. State is only available within the 
partition. One partition can only be processed by one task, but one task can 
handle one or multiple partitions.
+According to the partitioning approach, it can be further divided into the 
following two categories:
+
+    - Keyed Partition Stream: Each key is a partition, and the partition to 
which the data belongs is deterministic.
+
+    - Non-Keyed Partition Stream: Each parallelism is a partition, and the 
partition to which the data belongs is nondeterministic.
+
+- Broadcast Stream: Each partition contains the same data.
+
+## Partitioning
+
+Above we defined the stream and how it is partitioned. The next topic to 
discuss is how to convert
+between different partition types. We call these transformations partitioning.
+For example non-keyed partition stream can be transformed to keyed partition 
stream via a `KeyBy` partitioning.
+
+```java
+NonKeyedPartitionStream<Tuple<Integer, String>> stream = ...;
+KeyedPartitionStream<Integer, String> keyedStream = stream.keyBy(record -> 
record.f0);
+```
+
+Overall, we have the following four partitioning:
+ 
+- KeyBy: Let all data be repartitioned according to specified key.
+
+- Shuffle: Repartition and shuffle all data.
+
+- Global: Merge all partitions into one. 
+
+- Broadcast: Force partitions broadcast data to downstream.
+
+The specific transformation relationship is shown in the following table:
+
+| Partitioning | Global | Keyed | NonKeyed |      Broadcast      |
+|:------------:|:------:|:-----:|:--------:|:-------------------:|
+|    Global    |   ❎    | KeyBy | Shuffle  |      Broadcast     |
+|    Keyed     | Global | KeyBy | Shuffle  |      Broadcast      |
+|   NonKeyed   | Global | KeyBy | Shuffle  |      Broadcast      |
+|  Broadcast   |   ❎   |  ❎   |    ❎    |          ❎        |
+
+(A crossed box indicates that it is not supported or not required)
+
+One thing to note is: broadcast can only be used in conjunction with other 
inputs and cannot be directly converted to other streams.
+
+## ProcessFunction
+Once we have the data stream, we can apply operations on it. The operations 
that can be performed over
+DataStream are collectively called Process Function. It is the only entrypoint 
for defining all kinds
+of processing on the data streams.
+
+### Classification of ProcessFunction
+According to the number of input / output, they are classified as follows:
+
+|               Partitioning                |    Number of Inputs    |        
Number of Outputs        |
+|:-----------------------------------------:|:----------------------:|:-------------------------------:|
+|    OneInputStreamProcessFunction          |           1            |         
       1                |
+| TwoInputNonBroadcastStreamProcessFunction |           2            |         
       1                |
+|  TwoInputBroadcastStreamProcessFunction   |           2            |         
       1                |
+|      TwoOutputStreamProcessFunction       |           1            |         
       2                |
+
+Logically, process functions that support more inputs and outputs can be 
achieved by combining them, 

Review Comment:
   What is `them`?



##########
docs/content/docs/dev/datastream-v2/builtin-funcs/joining.md:
##########
@@ -0,0 +1,116 @@
+---
+title: "Joining"
+weight: 9 
+type: docs
+aliases:
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Joining
+
+Join is used to merge two data streams by matching elements from both streams 
based on a common key, 
+and performing calculations on the matched elements. 
+
+This section will introduce the Join operation in DataStream in detail.
+
+Please note that currently, DataStream supports only non-window INNER joins. 
+Other types of joins, such as interval joins, lookup joins, and window joins, 
will be supported in the future.
+
+## Non-Window Join
+
+In non-window Join, Flink first creates two built-in states to store the input 
data, 
+specifically for each of the two input data streams. 
+When data arrives from one side, Flink attempts to match it with the data 
stored in the state 
+of the other side and performs computations on the matching elements.

Review Comment:
   This is internal.



##########
docs/content/docs/dev/datastream-v2/building_blocks.md:
##########
@@ -0,0 +1,215 @@
+---
+title: Building Blocks
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Building Blocks
+
+DataStream, Partitioning, ProcessFunction are the most fundamental elements of 
DataStream API and respectively represent:
+
+- What are the types of data streams
+
+- How data is partitioned
+
+- How to perform operations / processing on data streams
+
+They are also the core parts of the fundamental primitives provided by 
DataStream API.
+
+## DataStream
+
+Data flows on the stream may be divided into multiple partitions. According to 
how the data is partitioned on the stream, we divide it into the following 
categories:
+
+- Global Stream: Force single partition/parallelism, and the correctness of 
data depends on this.
+
+- Partition Stream:
+  - Divide data into multiple partitions. State is only available within the 
partition. One partition can only be processed by one task, but one task can 
handle one or multiple partitions.
+According to the partitioning approach, it can be further divided into the 
following two categories:
+
+    - Keyed Partition Stream: Each key is a partition, and the partition to 
which the data belongs is deterministic.
+
+    - Non-Keyed Partition Stream: Each parallelism is a partition, and the 
partition to which the data belongs is nondeterministic.
+
+- Broadcast Stream: Each partition contains the same data.
+
+## Partitioning
+
+Above we defined the stream and how it is partitioned. The next topic to 
discuss is how to convert
+between different partition types. We call these transformations partitioning.
+For example non-keyed partition stream can be transformed to keyed partition 
stream via a `KeyBy` partitioning.
+
+```java
+NonKeyedPartitionStream<Tuple<Integer, String>> stream = ...;
+KeyedPartitionStream<Integer, String> keyedStream = stream.keyBy(record -> 
record.f0);
+```
+
+Overall, we have the following four partitioning:
+ 
+- KeyBy: Let all data be repartitioned according to specified key.
+
+- Shuffle: Repartition and shuffle all data.
+
+- Global: Merge all partitions into one. 
+
+- Broadcast: Force partitions broadcast data to downstream.
+
+The specific transformation relationship is shown in the following table:
+
+| Partitioning | Global | Keyed | NonKeyed |      Broadcast      |
+|:------------:|:------:|:-----:|:--------:|:-------------------:|
+|    Global    |   ❎    | KeyBy | Shuffle  |      Broadcast     |
+|    Keyed     | Global | KeyBy | Shuffle  |      Broadcast      |
+|   NonKeyed   | Global | KeyBy | Shuffle  |      Broadcast      |
+|  Broadcast   |   ❎   |  ❎   |    ❎    |          ❎        |

Review Comment:
   What does the columns and rows mean?



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.
+
+### Fundamental primitives
+
+Fundamental primitives are the basic and necessary semantics that flink need 
to provide in order to
+define a stateful stream processing application, which cannot be achieved by 
users if not provided by
+the framework. It includes dataStream, partitioning, process function, state, 
processing timer service,
+watermark and async processing(not provided for now).
+
+More details can be found in:
+- [Building Blocks]({{< ref "docs/dev/datastream-v2/building_blocks" >}}): 
Given the most basic elements of DataStream API.
+- [State Processing]({{< ref 
"docs/dev/datastream-v2/context_and_state_processing" >}}): Explanation of how 
to develop stateful applications.
+- [Time Processing # Processing Timer Service]({{< ref 
"docs/dev/datastream-v2/time-processing/processing_timer_service" >}}): 
Explanation of how to handle processing time.
+- [Watermark]({{< ref "docs/dev/datastream-v2/watermark/define_watermark" 
>}}): Explanation of how to define and handle user defined events.
+
+### High-Level Extensions
+
+High-Level extensions are like short-cuts / sugars, without which users can 
probably still achieve the same
+behavior by working with the fundamental APIs, but would be a lot easier with 
the builtin supports. 
+It includes common built-in functions(e.g. map, filter, reduce, etc. not 
provided for now), event timer service, window and join.
+
+More details can be found in:
+- [Time Processing # Event Timer Service]({{< ref 
"docs/dev/datastream-v2/time-processing/event_timer_service" >}}): Explanation 
of how to handle event time via extension.
+- [Builtin Functions]({{< ref "docs/dev/datastream-v2/builtin-funcs/windows" 
>}}): Explanation of how to do window aggregation and join via extension.
+
+## Anatomy of a Flink DataStream Program
+
+Flink programs look like regular programs that transform `DataStream`.  Each
+program consists of the same basic parts:
+
+1. Obtain an `Execution Environment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations,
+5. Trigger the program execution
+
+### Obtain an `Execution Environment`
+
+We will now give an overview of each of those steps, please refer to the
+respective sections for more details.
+
+The `ExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on `ExecutionEnvironment`:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+```
+
+If you are executing your program inside an IDE or as a regular Java program 
it will create a local environment
+that will execute your program on your local machine. If you created a JAR file
+from your program, and invoke it through the command line, the Flink cluster 
manager will execute your main method and
+`ExecutionEnvironment.getInstance()` will return an execution environment for 
executing
+your program on a cluster.
+
+
+### Load/create the Initial Data
+
+Sources are where your program reads its input from. You can attach a source 
to your program by
+using `ExecutionEnvironment.fromSource(source, sourceName)`. Flink comes with 
a number of pre-implemented
+source, you can use [FLIP-27](https://cwiki.apache.org/confluence/x/co_zBQ) 
based source via `DataStreamV2SourceUtils.wrapSource(source)` or use 
+`DataStreamV2SourceUtils.fromData(collection)` for testing/debugging purpose.
+
+As an example, to just read data from a predefined collection, you can use:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+NonKeyedPartitionStream<String> input = 
+        env.fromSource(
+                DataStreamV2SourceUtils.fromData(Arrays.asList("1", "2", 
"3")), 
+                "source"
+        );
+```
+
+Since data from source has no clear partitioning, this will give you a 
`NonKeyedPartitionStream` on which you can then apply transformations to create 
new
+derived DataStreams. For other types of DataStream, see [Building Blocks # 
DataStream]({{< ref "docs/dev/datastream-v2/building_blocks" >}}#datastream).
+
+### Specify Transformations on this Data
+
+You apply transformations by calling methods on DataStream with a
+`ProcssFunction`(using lambda expression here for simplicity). For example, a 
map transformation looks like this:
+
+```java
+NonKeyedPartitionStream<String> input = ...;
+
+NonKeyedPartitionStream<Integer> parsed = source.process(
+        (OneInputStreamProcessFunction<String, Integer>)
+                (record, output, ctx) -> {
+                    output.collect(Integer.parseInt(record));
+                });
+```
+
+This will create a new DataStream by converting every String in the original
+collection to an Integer. For more details of processing, see [Building Blocks 
# Process Function]({{< ref "docs/dev/datastream-v2/building_blocks" 
>}}#processfunction). 
+
+### Specify Where to Put the Results of Your Computations
+
+Once you have a DataStream containing your final results, you can write it to
+an outside system by creating a sink. 

Review Comment:
   ```suggestion
   an external system by creating a sink. 
   ```



##########
docs/content/docs/dev/datastream-v2/building_blocks.md:
##########
@@ -0,0 +1,215 @@
+---
+title: Building Blocks
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Building Blocks
+
+DataStream, Partitioning, ProcessFunction are the most fundamental elements of 
DataStream API and respectively represent:
+
+- What are the types of data streams
+
+- How data is partitioned
+
+- How to perform operations / processing on data streams
+
+They are also the core parts of the fundamental primitives provided by 
DataStream API.
+
+## DataStream
+
+Data flows on the stream may be divided into multiple partitions. According to 
how the data is partitioned on the stream, we divide it into the following 
categories:
+
+- Global Stream: Force single partition/parallelism, and the correctness of 
data depends on this.
+
+- Partition Stream:
+  - Divide data into multiple partitions. State is only available within the 
partition. One partition can only be processed by one task, but one task can 
handle one or multiple partitions.
+According to the partitioning approach, it can be further divided into the 
following two categories:
+
+    - Keyed Partition Stream: Each key is a partition, and the partition to 
which the data belongs is deterministic.
+
+    - Non-Keyed Partition Stream: Each parallelism is a partition, and the 
partition to which the data belongs is nondeterministic.
+
+- Broadcast Stream: Each partition contains the same data.
+
+## Partitioning
+
+Above we defined the stream and how it is partitioned. The next topic to 
discuss is how to convert
+between different partition types. We call these transformations partitioning.
+For example non-keyed partition stream can be transformed to keyed partition 
stream via a `KeyBy` partitioning.
+
+```java
+NonKeyedPartitionStream<Tuple<Integer, String>> stream = ...;
+KeyedPartitionStream<Integer, String> keyedStream = stream.keyBy(record -> 
record.f0);
+```
+
+Overall, we have the following four partitioning:
+ 
+- KeyBy: Let all data be repartitioned according to specified key.
+
+- Shuffle: Repartition and shuffle all data.
+
+- Global: Merge all partitions into one. 
+
+- Broadcast: Force partitions broadcast data to downstream.
+
+The specific transformation relationship is shown in the following table:
+
+| Partitioning | Global | Keyed | NonKeyed |      Broadcast      |
+|:------------:|:------:|:-----:|:--------:|:-------------------:|
+|    Global    |   ❎    | KeyBy | Shuffle  |      Broadcast     |
+|    Keyed     | Global | KeyBy | Shuffle  |      Broadcast      |
+|   NonKeyed   | Global | KeyBy | Shuffle  |      Broadcast      |
+|  Broadcast   |   ❎   |  ❎   |    ❎    |          ❎        |
+
+(A crossed box indicates that it is not supported or not required)
+
+One thing to note is: broadcast can only be used in conjunction with other 
inputs and cannot be directly converted to other streams.
+
+## ProcessFunction
+Once we have the data stream, we can apply operations on it. The operations 
that can be performed over
+DataStream are collectively called Process Function. It is the only entrypoint 
for defining all kinds
+of processing on the data streams.
+
+### Classification of ProcessFunction
+According to the number of input / output, they are classified as follows:
+
+|               Partitioning                |    Number of Inputs    |        
Number of Outputs        |
+|:-----------------------------------------:|:----------------------:|:-------------------------------:|
+|    OneInputStreamProcessFunction          |           1            |         
       1                |
+| TwoInputNonBroadcastStreamProcessFunction |           2            |         
       1                |
+|  TwoInputBroadcastStreamProcessFunction   |           2            |         
       1                |
+|      TwoOutputStreamProcessFunction       |           1            |         
       2                |
+
+Logically, process functions that support more inputs and outputs can be 
achieved by combining them, 
+but this implementation might be inefficient. If the call for this becomes 
louder, 
+we will consider supporting as many output edges as we want through a 
mechanism like OutputTag.
+But this loses the explicit generic type information that comes with using 
ProcessFunction.
+
+The case of two input is relatively special, and we have divided it into two 
categories:
+
+- TwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a 
BroadcastStream, so processing only applied to the single partition.
+- TwoInputBroadcastStreamProcessFunction: One of its inputs is the 
BroadcastStream, so the processing of this input is applied to all partitions. 
While the other side is Keyed/Non-Keyed Stream, it's processing applied to 
single partition.
+
+DataStream has series of `process` and `connectAndProcess` methods to 
transform the input stream or connect and transform two input streams via 
ProcssFunction.
+
+### Requirements for input and output streams
+
+The following two tables list the input and output stream combinations 
supported by OneInputStreamProcessFunction and TwoOutputStreamProcessFunction 
respectively.
+
+For OneInputStreamProcessFunction:
+
+| Input Stream |  Output Stream   |  
+|:------------:|:----------------:|
+|    Global    |      Global      |
+|    Keyed     | Keyed / NonKeyed |
+|   NonKeyed   |     NonKeyed     |
+|  Broadcast   |  Not Supported   |
+
+For TwoOutputStreamProcessFunction:
+
+| Input Stream |             Output Stream             |  
+|:------------:|:-------------------------------------:|
+|    Global    |            Global + Global            |
+|    Keyed     | Keyed + Keyed / Non-Keyed + Non-Keyed |
+|   NonKeyed   |          NonKeyed + NonKeyed          |
+|  Broadcast   |             Not Supported             |
+
+There are two points to note here:
+- When KeyedPartitionStream is used as input, the output can be either a 
KeyedPartitionStream or NonKeyedPartitionStream.
+For general data processing logic, how to partition data is uncertain, we can 
only expect a NonKeyedPartitionStream.
+If we do need a deterministic partition, we can follow it with a KeyBy 
partitioning.
+However, there are times when we know for sure that the partition of records 
will not change before
+and after processing, shuffle cost due to the extra partitioning can be 
avoided.
+To be safe, in this case we ask for a KeySelector for the output data, and the 
framework
+checks at runtime to see if this invariant is broken. The same is true for two 
output and two input counterparts.
+- Broadcast stream cannot be used as a single input.
+
+Things with two inputs is a little more complicated. The following table lists 
which streams are compatible with each other and the types of streams they 
output.
+
+A cross(❎) indicates not supported.
+
+|  Output   | Global |       Keyed        | NonKeyed |     Broadcast     |
+|:---------:|:------:|:------------------:|:--------:|:-----------------:|
+|  Global   | Global |         ❎          |    ❎     |         ❎      |
+|   Keyed   |   ❎   | NonKeyed / Keyed   |    ❎     | NonKeyed / Keyed  |
+| NonKeyed  |   ❎   |         ❎          | NonKeyed |     NonKeyed      |
+| Broadcast |   ❎   |  NonKeyed / Keyed  | NonKeyed |         ❎         |
+
+The reason why the connection between Global Stream and Non-Global Stream is 
not supported is that the number of partitions of GlobalStream is forced to be 
1, but it is generally not 1 for Non-Global Stream, which will cause conflicts 
when determining the number of partitions of the output stream. If necessary, 
they should be transformed into mutually compatible streams and then connected.
+Connecting two broadcast streams doesn't really make sense, because each 
parallelism would have exactly same input data from both streams and any 
process would be duplicated.
+The reason why the output of two keyed partition streams can be keyed or 
non-keyed is the same as we mentioned above in the case of single input.
+When we connect two KeyedPartitionStream, they must have the same key type, 
otherwise we can't decide how to merge the partitions of the two streams. At 
the same time, things like access state and register timer are also restricted 
to the partition itself, cross-partition interaction is not meaningful.
+
+The reasons why the connection between KeyedPartitionStream and 
NonKeyedPartitionStream is not supported are as follows:
+The data on KeyedStream is deterministic, but on NonKeyed is not. It is 
difficult to think of a scenario where the two need to be connected.
+This will complicate the state declaration and access rules. A more detailed 
discussion can be seen in the subsequent state-related sub-FLIP.
+If we see that most people have clear demands for this, we can support it in 
the future.
+
+## Config Process
+
+After defining the process functions, you may want to make some configurations 
for the properties of this processing.
+For example, set the parallelism and name of the process operation, etc.
+
+Therefore, the return value of `process`/`connectAndProcess` meets the 
following two requirements at the same time:
+
+- It should be a handle, allowing us to configure the previous processing.
+
+- It should be a new DataStream, allowing us to do further processing on it.
+
+The advantage of this is that configurations can be made more conveniently by 
continuously using `withXXX` . For example:
+
+```java
+inputStream
+  .process(func1) // do process 1
+  .withName("my-process-func") // configure name for process 1
+  .withParallelism(2) //  configure parallelism for process 1
+  .process(func2) //  do further process 2
+```

Review Comment:
   User documentation should introduce how to use the APIs, not why the API is 
designed the current way



##########
docs/content/docs/dev/datastream-v2/building_blocks.md:
##########
@@ -0,0 +1,215 @@
+---
+title: Building Blocks
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Building Blocks
+
+DataStream, Partitioning, ProcessFunction are the most fundamental elements of 
DataStream API and respectively represent:
+
+- What are the types of data streams
+
+- How data is partitioned
+
+- How to perform operations / processing on data streams
+
+They are also the core parts of the fundamental primitives provided by 
DataStream API.
+
+## DataStream
+
+Data flows on the stream may be divided into multiple partitions. According to 
how the data is partitioned on the stream, we divide it into the following 
categories:
+
+- Global Stream: Force single partition/parallelism, and the correctness of 
data depends on this.
+
+- Partition Stream:
+  - Divide data into multiple partitions. State is only available within the 
partition. One partition can only be processed by one task, but one task can 
handle one or multiple partitions.
+According to the partitioning approach, it can be further divided into the 
following two categories:
+
+    - Keyed Partition Stream: Each key is a partition, and the partition to 
which the data belongs is deterministic.
+
+    - Non-Keyed Partition Stream: Each parallelism is a partition, and the 
partition to which the data belongs is nondeterministic.
+
+- Broadcast Stream: Each partition contains the same data.
+
+## Partitioning
+
+Above we defined the stream and how it is partitioned. The next topic to 
discuss is how to convert
+between different partition types. We call these transformations partitioning.
+For example non-keyed partition stream can be transformed to keyed partition 
stream via a `KeyBy` partitioning.
+
+```java
+NonKeyedPartitionStream<Tuple<Integer, String>> stream = ...;
+KeyedPartitionStream<Integer, String> keyedStream = stream.keyBy(record -> 
record.f0);
+```
+
+Overall, we have the following four partitioning:
+ 
+- KeyBy: Let all data be repartitioned according to specified key.
+
+- Shuffle: Repartition and shuffle all data.
+
+- Global: Merge all partitions into one. 
+
+- Broadcast: Force partitions broadcast data to downstream.
+
+The specific transformation relationship is shown in the following table:
+
+| Partitioning | Global | Keyed | NonKeyed |      Broadcast      |
+|:------------:|:------:|:-----:|:--------:|:-------------------:|
+|    Global    |   ❎    | KeyBy | Shuffle  |      Broadcast     |
+|    Keyed     | Global | KeyBy | Shuffle  |      Broadcast      |
+|   NonKeyed   | Global | KeyBy | Shuffle  |      Broadcast      |
+|  Broadcast   |   ❎   |  ❎   |    ❎    |          ❎        |
+
+(A crossed box indicates that it is not supported or not required)
+
+One thing to note is: broadcast can only be used in conjunction with other 
inputs and cannot be directly converted to other streams.
+
+## ProcessFunction
+Once we have the data stream, we can apply operations on it. The operations 
that can be performed over
+DataStream are collectively called Process Function. It is the only entrypoint 
for defining all kinds
+of processing on the data streams.
+
+### Classification of ProcessFunction
+According to the number of input / output, they are classified as follows:
+
+|               Partitioning                |    Number of Inputs    |        
Number of Outputs        |
+|:-----------------------------------------:|:----------------------:|:-------------------------------:|
+|    OneInputStreamProcessFunction          |           1            |         
       1                |
+| TwoInputNonBroadcastStreamProcessFunction |           2            |         
       1                |
+|  TwoInputBroadcastStreamProcessFunction   |           2            |         
       1                |
+|      TwoOutputStreamProcessFunction       |           1            |         
       2                |
+
+Logically, process functions that support more inputs and outputs can be 
achieved by combining them, 
+but this implementation might be inefficient. If the call for this becomes 
louder, 
+we will consider supporting as many output edges as we want through a 
mechanism like OutputTag.
+But this loses the explicit generic type information that comes with using 
ProcessFunction.
+
+The case of two input is relatively special, and we have divided it into two 
categories:
+
+- TwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a 
BroadcastStream, so processing only applied to the single partition.
+- TwoInputBroadcastStreamProcessFunction: One of its inputs is the 
BroadcastStream, so the processing of this input is applied to all partitions. 
While the other side is Keyed/Non-Keyed Stream, it's processing applied to 
single partition.

Review Comment:
   It's good enough to say we have two types of two-input process function, 
depending on whether one of the input is broadcast stream. It's too early to 
explain which partition(s) the processing is applied to.



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.
+
+### Fundamental primitives
+
+Fundamental primitives are the basic and necessary semantics that flink need 
to provide in order to
+define a stateful stream processing application, which cannot be achieved by 
users if not provided by
+the framework. It includes dataStream, partitioning, process function, state, 
processing timer service,
+watermark and async processing(not provided for now).

Review Comment:
   Or just don't mention it for now



##########
docs/content/docs/dev/datastream-v2/building_blocks.md:
##########
@@ -0,0 +1,215 @@
+---
+title: Building Blocks
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Building Blocks
+
+DataStream, Partitioning, ProcessFunction are the most fundamental elements of 
DataStream API and respectively represent:
+
+- What are the types of data streams
+
+- How data is partitioned
+
+- How to perform operations / processing on data streams
+
+They are also the core parts of the fundamental primitives provided by 
DataStream API.
+
+## DataStream
+
+Data flows on the stream may be divided into multiple partitions. According to 
how the data is partitioned on the stream, we divide it into the following 
categories:
+
+- Global Stream: Force single partition/parallelism, and the correctness of 
data depends on this.
+
+- Partition Stream:
+  - Divide data into multiple partitions. State is only available within the 
partition. One partition can only be processed by one task, but one task can 
handle one or multiple partitions.
+According to the partitioning approach, it can be further divided into the 
following two categories:
+
+    - Keyed Partition Stream: Each key is a partition, and the partition to 
which the data belongs is deterministic.
+
+    - Non-Keyed Partition Stream: Each parallelism is a partition, and the 
partition to which the data belongs is nondeterministic.
+
+- Broadcast Stream: Each partition contains the same data.
+
+## Partitioning
+
+Above we defined the stream and how it is partitioned. The next topic to 
discuss is how to convert
+between different partition types. We call these transformations partitioning.
+For example non-keyed partition stream can be transformed to keyed partition 
stream via a `KeyBy` partitioning.
+
+```java
+NonKeyedPartitionStream<Tuple<Integer, String>> stream = ...;
+KeyedPartitionStream<Integer, String> keyedStream = stream.keyBy(record -> 
record.f0);
+```
+
+Overall, we have the following four partitioning:
+ 
+- KeyBy: Let all data be repartitioned according to specified key.
+
+- Shuffle: Repartition and shuffle all data.
+
+- Global: Merge all partitions into one. 
+
+- Broadcast: Force partitions broadcast data to downstream.
+
+The specific transformation relationship is shown in the following table:
+
+| Partitioning | Global | Keyed | NonKeyed |      Broadcast      |
+|:------------:|:------:|:-----:|:--------:|:-------------------:|
+|    Global    |   ❎    | KeyBy | Shuffle  |      Broadcast     |
+|    Keyed     | Global | KeyBy | Shuffle  |      Broadcast      |
+|   NonKeyed   | Global | KeyBy | Shuffle  |      Broadcast      |
+|  Broadcast   |   ❎   |  ❎   |    ❎    |          ❎        |
+
+(A crossed box indicates that it is not supported or not required)
+
+One thing to note is: broadcast can only be used in conjunction with other 
inputs and cannot be directly converted to other streams.
+
+## ProcessFunction
+Once we have the data stream, we can apply operations on it. The operations 
that can be performed over
+DataStream are collectively called Process Function. It is the only entrypoint 
for defining all kinds
+of processing on the data streams.
+
+### Classification of ProcessFunction
+According to the number of input / output, they are classified as follows:
+
+|               Partitioning                |    Number of Inputs    |        
Number of Outputs        |
+|:-----------------------------------------:|:----------------------:|:-------------------------------:|
+|    OneInputStreamProcessFunction          |           1            |         
       1                |
+| TwoInputNonBroadcastStreamProcessFunction |           2            |         
       1                |
+|  TwoInputBroadcastStreamProcessFunction   |           2            |         
       1                |
+|      TwoOutputStreamProcessFunction       |           1            |         
       2                |
+
+Logically, process functions that support more inputs and outputs can be 
achieved by combining them, 
+but this implementation might be inefficient. If the call for this becomes 
louder, 
+we will consider supporting as many output edges as we want through a 
mechanism like OutputTag.
+But this loses the explicit generic type information that comes with using 
ProcessFunction.
+
+The case of two input is relatively special, and we have divided it into two 
categories:
+
+- TwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a 
BroadcastStream, so processing only applied to the single partition.
+- TwoInputBroadcastStreamProcessFunction: One of its inputs is the 
BroadcastStream, so the processing of this input is applied to all partitions. 
While the other side is Keyed/Non-Keyed Stream, it's processing applied to 
single partition.
+
+DataStream has series of `process` and `connectAndProcess` methods to 
transform the input stream or connect and transform two input streams via 
ProcssFunction.
+
+### Requirements for input and output streams
+
+The following two tables list the input and output stream combinations 
supported by OneInputStreamProcessFunction and TwoOutputStreamProcessFunction 
respectively.
+
+For OneInputStreamProcessFunction:
+
+| Input Stream |  Output Stream   |  
+|:------------:|:----------------:|
+|    Global    |      Global      |
+|    Keyed     | Keyed / NonKeyed |
+|   NonKeyed   |     NonKeyed     |
+|  Broadcast   |  Not Supported   |
+
+For TwoOutputStreamProcessFunction:
+
+| Input Stream |             Output Stream             |  
+|:------------:|:-------------------------------------:|
+|    Global    |            Global + Global            |
+|    Keyed     | Keyed + Keyed / Non-Keyed + Non-Keyed |
+|   NonKeyed   |          NonKeyed + NonKeyed          |
+|  Broadcast   |             Not Supported             |
+
+There are two points to note here:
+- When KeyedPartitionStream is used as input, the output can be either a 
KeyedPartitionStream or NonKeyedPartitionStream.
+For general data processing logic, how to partition data is uncertain, we can 
only expect a NonKeyedPartitionStream.
+If we do need a deterministic partition, we can follow it with a KeyBy 
partitioning.
+However, there are times when we know for sure that the partition of records 
will not change before
+and after processing, shuffle cost due to the extra partitioning can be 
avoided.
+To be safe, in this case we ask for a KeySelector for the output data, and the 
framework
+checks at runtime to see if this invariant is broken. The same is true for two 
output and two input counterparts.
+- Broadcast stream cannot be used as a single input.
+
+Things with two inputs is a little more complicated. The following table lists 
which streams are compatible with each other and the types of streams they 
output.
+
+A cross(❎) indicates not supported.
+
+|  Output   | Global |       Keyed        | NonKeyed |     Broadcast     |
+|:---------:|:------:|:------------------:|:--------:|:-----------------:|
+|  Global   | Global |         ❎          |    ❎     |         ❎      |
+|   Keyed   |   ❎   | NonKeyed / Keyed   |    ❎     | NonKeyed / Keyed  |
+| NonKeyed  |   ❎   |         ❎          | NonKeyed |     NonKeyed      |
+| Broadcast |   ❎   |  NonKeyed / Keyed  | NonKeyed |         ❎         |
+
+The reason why the connection between Global Stream and Non-Global Stream is 
not supported is that the number of partitions of GlobalStream is forced to be 
1, but it is generally not 1 for Non-Global Stream, which will cause conflicts 
when determining the number of partitions of the output stream. If necessary, 
they should be transformed into mutually compatible streams and then connected.
+Connecting two broadcast streams doesn't really make sense, because each 
parallelism would have exactly same input data from both streams and any 
process would be duplicated.
+The reason why the output of two keyed partition streams can be keyed or 
non-keyed is the same as we mentioned above in the case of single input.
+When we connect two KeyedPartitionStream, they must have the same key type, 
otherwise we can't decide how to merge the partitions of the two streams. At 
the same time, things like access state and register timer are also restricted 
to the partition itself, cross-partition interaction is not meaningful.
+
+The reasons why the connection between KeyedPartitionStream and 
NonKeyedPartitionStream is not supported are as follows:
+The data on KeyedStream is deterministic, but on NonKeyed is not. It is 
difficult to think of a scenario where the two need to be connected.
+This will complicate the state declaration and access rules. A more detailed 
discussion can be seen in the subsequent state-related sub-FLIP.
+If we see that most people have clear demands for this, we can support it in 
the future.

Review Comment:
   Same here, not user-facing



##########
docs/content/docs/dev/datastream-v2/overview.md:
##########
@@ -0,0 +1,195 @@
+---
+title: Overview
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink DataStream API Programming Guide
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Note: DataStream API V2 is a new set of APIs, to gradually replace the 
original DataStream API. It is currently in the experimental stage and is not 
fully available for production.
+
+## What is a DataStream?
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `process`, `connectAndProcess`, and so on.
+
+## Fundamental Primitives and Extensions
+
+Based on whether its functionality must be provided by flink, we divide the 
relevant concepts in DataStream API 
+into two categories: fundamental primitives and high-Level extensions.
+
+### Fundamental primitives
+
+Fundamental primitives are the basic and necessary semantics that flink need 
to provide in order to
+define a stateful stream processing application, which cannot be achieved by 
users if not provided by
+the framework. It includes dataStream, partitioning, process function, state, 
processing timer service,
+watermark and async processing(not provided for now).
+
+More details can be found in:
+- [Building Blocks]({{< ref "docs/dev/datastream-v2/building_blocks" >}}): 
Given the most basic elements of DataStream API.
+- [State Processing]({{< ref 
"docs/dev/datastream-v2/context_and_state_processing" >}}): Explanation of how 
to develop stateful applications.
+- [Time Processing # Processing Timer Service]({{< ref 
"docs/dev/datastream-v2/time-processing/processing_timer_service" >}}): 
Explanation of how to handle processing time.
+- [Watermark]({{< ref "docs/dev/datastream-v2/watermark/define_watermark" 
>}}): Explanation of how to define and handle user defined events.
+
+### High-Level Extensions
+
+High-Level extensions are like short-cuts / sugars, without which users can 
probably still achieve the same
+behavior by working with the fundamental APIs, but would be a lot easier with 
the builtin supports. 
+It includes common built-in functions(e.g. map, filter, reduce, etc. not 
provided for now), event timer service, window and join.

Review Comment:
   And same here for the not-provided features



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to