dianfu commented on a change in pull request #14068:
URL: https://github.com/apache/flink/pull/14068#discussion_r523956531



##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -637,6 +639,32 @@ def time_domain(self) -> TimeDomain:
             pass
 
 
+class KeyedProcessFunction(ProcessFunction, ABC):
+    """
+    A keyed function processes elements of a stream.
+
+    For every element in the input stream, process_element() is invoked. This 
can produce zero or
+    more elements as output. Implementations can also query the time and set 
timers through the
+    provided Context. For firing timers on_timer() will be invoked. This can 
again produce zero or
+    more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a 
key) is only available if
+    the KeyedProcessFunction is applied on a KeyedStream.
+    """
+
+    class Context(ProcessFunction.Context):
+
+        @abc.abstractmethod
+        def get_current_key(self):
+            pass
+
+    class OnTimerContext(ProcessFunction.OnTimerContext):

Review comment:
       What about don't extend ProcessFunction? Otherwise, I think type hint 
doesn't work well.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1078,3 +1040,83 @@ def _get_connected_stream_operator(self,
 
     def _is_keyed_stream(self):
         return isinstance(self.stream1, KeyedStream) and 
isinstance(self.stream2, KeyedStream)
+
+
+def _get_java_python_function_operator(data_stream: DataStream,
+                                       func: Union[Function, FunctionWrapper],
+                                       type_info: TypeInformation,
+                                       func_type: int):
+    """
+    Create a flink operator according to user provided function object, data 
types,
+    function name and function type.
+
+    :param func: a function object that implements the Function interface.
+    :param type_info: the data type of the function output data.
+    :param func_type: function type, supports MAP, FLAT_MAP, etc.
+    :return: A flink java operator which is responsible for execution user 
defined python
+             function.
+    """
+
+    gateway = get_gateway()
+    import cloudpickle
+    serialized_func = cloudpickle.dumps(func)
+    j_input_types = 
data_stream._j_data_stream.getTransformation().getOutputType()
+    if type_info is None:
+        output_type_info = PickledBytesTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO()
+    elif isinstance(type_info, list):
+        output_type_info = RowTypeInfo(type_info)
+    else:
+        output_type_info = type_info
+
+    j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction(
+        bytearray(serialized_func),
+        _get_python_env())
+    j_data_stream_python_function_info = 
gateway.jvm.DataStreamPythonFunctionInfo(
+        j_data_stream_python_function,
+        func_type)
+
+    j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
+
+    # set max bundle size to 1 to force synchronize process for reduce 
function.
+    from pyflink.fn_execution.flink_fn_execution_pb2 import 
UserDefinedDataStreamFunction
+    if func_type == UserDefinedDataStreamFunction.REDUCE:  # type: ignore
+        
j_conf.setInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE,
 1)
+
+        j_output_type_info = j_input_types.getTypeAt(1)
+        j_python_reduce_operator = gateway.jvm.PythonReduceOperator(
+            j_conf,
+            j_input_types,
+            j_output_type_info,
+            j_data_stream_python_function_info)
+        return j_python_reduce_operator, j_output_type_info
+    elif func_type == UserDefinedDataStreamFunction.PROCESS:  # type: ignore
+        j_python_process_function_operator = 
gateway.jvm.PythonProcessFunctionOperator(
+            j_conf,
+            j_input_types,
+            output_type_info.get_java_type_info(),
+            j_data_stream_python_function_info)
+        return j_python_process_function_operator, 
output_type_info.get_java_type_info()
+    elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:  # type: 
ignore
+        j_python_process_function_operator = 
gateway.jvm.PythonKeyedProcessFunctionOperator(
+            j_conf,
+            j_input_types,
+            output_type_info.get_java_type_info(),
+            j_data_stream_python_function_info)
+        return j_python_process_function_operator, 
output_type_info.get_java_type_info()
+    else:
+        if str(func) == '_Flink_PartitionCustomMapFunction':
+            JDataStreamPythonFunctionOperator = 
gateway.jvm.PythonPartitionCustomOperator
+        elif func_type == UserDefinedDataStreamFunction.FLAT_MAP:  # type: 
ignore
+            JDataStreamPythonFunctionOperator = \
+                gateway.jvm.PythonFlatMapOperator
+        else:

Review comment:
       What about
   ```
   elif func_type == UserDefinedDataStreamFunction.PROCESS:
      XXX
   elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:
      XXX
   ```?

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -873,6 +806,29 @@ def key_by(self, key_selector: Union[Callable, 
KeySelector],
                key_type_info: TypeInformation = None) -> 'KeyedStream':
         return self._origin_stream.key_by(key_selector, key_type_info)
 
+    def process(self, func: ProcessFunction, output_type: TypeInformation = 
None) -> 'DataStream':

Review comment:
       ```suggestion
       def process(self, func: KeyedProcessFunction, output_type: 
TypeInformation = None) -> 'DataStream':
   ```

##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -220,14 +219,26 @@ def wrap_func(value):
         co_map_func = user_defined_func
 
         def wrap_func(value):
-            return co_map_func.map1(value[1]) if value[0] else 
co_map_func.map2(value[2])
+            return Row(1, co_map_func.map1(value[1])) \

Review comment:
       What about introducing Enum type to represent if it's from left or right 
stream?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * The {@link PythonCoFlatMapOperator} is responsible for executing the Python 
CoMap Function.
+ *
+ * @param <IN1> The input type of the first stream
+ * @param <IN2> The input type of the second stream
+ * @param <OUT> The output type of the CoMap function
+ */
+public class PythonCoFlatMapOperator<IN1, IN2, OUT> extends 
TwoInputPythonFunctionOperator<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * A flag indicating whether it is the end of flatMap1 outputs.
+        */
+       private boolean endOfFlatMap1;

Review comment:
       transient

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonFlatMapOperator.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+
+/**
+ * The {@link PythonFlatMapOperator} is responsible for executing Python 
functions that gets one
+ * input and produces zero/one or more outputs.
+ *
+ * @param <IN>     The type of the input elements
+ * @param <OUT>The type of the output elements
+ */
+public class PythonFlatMapOperator<IN, OUT> extends 
OneInputPythonFunctionOperator<IN, OUT> {

Review comment:
       @Internal

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -113,7 +113,8 @@ private static void alignStreamNode(StreamNode streamNode, 
StreamGraph streamGra
                }
 
                if 
(streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME) ||
-                       
streamNode.getOperatorName().equals(STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)) 
{
+                       
streamNode.getOperatorName().equals(STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME) 
||
+                       
streamNode.getOperatorName().equals(STREAM_TIMESTAMP_AND_WATERMARK_OPERATOR_NAME))
 {

Review comment:
       seems that this is not necessary.

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations.py
##########
@@ -91,14 +91,25 @@ def create_aggregate_function(factory, transform_id, 
transform_proto, parameter,
 @bundle_processor.BeamTransformFactory.register_urn(
     operations.PROCESS_FUNCTION_URN,
     flink_fn_execution_pb2.UserDefinedDataStreamFunction)
-def create_data_stream_stateful_function(factory, transform_id, 
transform_proto, parameter,
-                                         consumers):
-    return _create_stateful_user_defined_function_operation(
+def create_data_stream_process_function(factory, transform_id, 
transform_proto, parameter,
+                                        consumers):
+    return _create_user_defined_function_operation(
         factory, transform_proto, consumers, parameter,
-        beam_operations.StatefulFunctionOperation,
+        beam_operations.StatelessFunctionOperation,
         operations.ProcessFunctionOperation)
 
 
+@bundle_processor.BeamTransformFactory.register_urn(
+    operations.KEYED_PROCESS_FUNCTION_URN,
+    flink_fn_execution_pb2.UserDefinedDataStreamFunction)
+def create_data_stream_keyed_process_function(factory, transform_id, 
transform_proto, parameter,
+                                              consumers):
+    return _create_user_defined_process_function_operation(

Review comment:
       what about merge _create_user_defined_process_function_operation into 
_create_user_defined_function_operation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to