dianfu commented on a change in pull request #14068: URL: https://github.com/apache/flink/pull/14068#discussion_r523956531
########## File path: flink-python/pyflink/datastream/functions.py ########## @@ -637,6 +639,32 @@ def time_domain(self) -> TimeDomain: pass +class KeyedProcessFunction(ProcessFunction, ABC): + """ + A keyed function processes elements of a stream. + + For every element in the input stream, process_element() is invoked. This can produce zero or + more elements as output. Implementations can also query the time and set timers through the + provided Context. For firing timers on_timer() will be invoked. This can again produce zero or + more elements as output and register further timers. + + Note that access to keyed state and timers (which are also scoped to a key) is only available if + the KeyedProcessFunction is applied on a KeyedStream. + """ + + class Context(ProcessFunction.Context): + + @abc.abstractmethod + def get_current_key(self): + pass + + class OnTimerContext(ProcessFunction.OnTimerContext): Review comment: What about don't extend ProcessFunction? Otherwise, I think type hint doesn't work well. ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -1078,3 +1040,83 @@ def _get_connected_stream_operator(self, def _is_keyed_stream(self): return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream) + + +def _get_java_python_function_operator(data_stream: DataStream, + func: Union[Function, FunctionWrapper], + type_info: TypeInformation, + func_type: int): + """ + Create a flink operator according to user provided function object, data types, + function name and function type. + + :param func: a function object that implements the Function interface. + :param type_info: the data type of the function output data. + :param func_type: function type, supports MAP, FLAT_MAP, etc. + :return: A flink java operator which is responsible for execution user defined python + function. + """ + + gateway = get_gateway() + import cloudpickle + serialized_func = cloudpickle.dumps(func) + j_input_types = data_stream._j_data_stream.getTransformation().getOutputType() + if type_info is None: + output_type_info = PickledBytesTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO() + elif isinstance(type_info, list): + output_type_info = RowTypeInfo(type_info) + else: + output_type_info = type_info + + j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction( + bytearray(serialized_func), + _get_python_env()) + j_data_stream_python_function_info = gateway.jvm.DataStreamPythonFunctionInfo( + j_data_stream_python_function, + func_type) + + j_conf = gateway.jvm.org.apache.flink.configuration.Configuration() + + # set max bundle size to 1 to force synchronize process for reduce function. + from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction + if func_type == UserDefinedDataStreamFunction.REDUCE: # type: ignore + j_conf.setInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE, 1) + + j_output_type_info = j_input_types.getTypeAt(1) + j_python_reduce_operator = gateway.jvm.PythonReduceOperator( + j_conf, + j_input_types, + j_output_type_info, + j_data_stream_python_function_info) + return j_python_reduce_operator, j_output_type_info + elif func_type == UserDefinedDataStreamFunction.PROCESS: # type: ignore + j_python_process_function_operator = gateway.jvm.PythonProcessFunctionOperator( + j_conf, + j_input_types, + output_type_info.get_java_type_info(), + j_data_stream_python_function_info) + return j_python_process_function_operator, output_type_info.get_java_type_info() + elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS: # type: ignore + j_python_process_function_operator = gateway.jvm.PythonKeyedProcessFunctionOperator( + j_conf, + j_input_types, + output_type_info.get_java_type_info(), + j_data_stream_python_function_info) + return j_python_process_function_operator, output_type_info.get_java_type_info() + else: + if str(func) == '_Flink_PartitionCustomMapFunction': + JDataStreamPythonFunctionOperator = gateway.jvm.PythonPartitionCustomOperator + elif func_type == UserDefinedDataStreamFunction.FLAT_MAP: # type: ignore + JDataStreamPythonFunctionOperator = \ + gateway.jvm.PythonFlatMapOperator + else: Review comment: What about ``` elif func_type == UserDefinedDataStreamFunction.PROCESS: XXX elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS: XXX ```? ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -873,6 +806,29 @@ def key_by(self, key_selector: Union[Callable, KeySelector], key_type_info: TypeInformation = None) -> 'KeyedStream': return self._origin_stream.key_by(key_selector, key_type_info) + def process(self, func: ProcessFunction, output_type: TypeInformation = None) -> 'DataStream': Review comment: ```suggestion def process(self, func: KeyedProcessFunction, output_type: TypeInformation = None) -> 'DataStream': ``` ########## File path: flink-python/pyflink/fn_execution/operation_utils.py ########## @@ -220,14 +219,26 @@ def wrap_func(value): co_map_func = user_defined_func def wrap_func(value): - return co_map_func.map1(value[1]) if value[0] else co_map_func.map2(value[2]) + return Row(1, co_map_func.map1(value[1])) \ Review comment: What about introducing Enum type to represent if it's from left or right stream? ########## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.python; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; +import org.apache.flink.streaming.api.utils.PythonOperatorUtils; +import org.apache.flink.types.Row; + +/** + * The {@link PythonCoFlatMapOperator} is responsible for executing the Python CoMap Function. + * + * @param <IN1> The input type of the first stream + * @param <IN2> The input type of the second stream + * @param <OUT> The output type of the CoMap function + */ +public class PythonCoFlatMapOperator<IN1, IN2, OUT> extends TwoInputPythonFunctionOperator<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; + + /** + * A flag indicating whether it is the end of flatMap1 outputs. + */ + private boolean endOfFlatMap1; Review comment: transient ########## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonFlatMapOperator.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.python; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; +import org.apache.flink.streaming.api.utils.PythonOperatorUtils; + +/** + * The {@link PythonFlatMapOperator} is responsible for executing Python functions that gets one + * input and produces zero/one or more outputs. + * + * @param <IN> The type of the input elements + * @param <OUT>The type of the output elements + */ +public class PythonFlatMapOperator<IN, OUT> extends OneInputPythonFunctionOperator<IN, OUT> { Review comment: @Internal ########## File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java ########## @@ -113,7 +113,8 @@ private static void alignStreamNode(StreamNode streamNode, StreamGraph streamGra } if (streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME) || - streamNode.getOperatorName().equals(STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)) { + streamNode.getOperatorName().equals(STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME) || + streamNode.getOperatorName().equals(STREAM_TIMESTAMP_AND_WATERMARK_OPERATOR_NAME)) { Review comment: seems that this is not necessary. ########## File path: flink-python/pyflink/fn_execution/beam/beam_operations.py ########## @@ -91,14 +91,25 @@ def create_aggregate_function(factory, transform_id, transform_proto, parameter, @bundle_processor.BeamTransformFactory.register_urn( operations.PROCESS_FUNCTION_URN, flink_fn_execution_pb2.UserDefinedDataStreamFunction) -def create_data_stream_stateful_function(factory, transform_id, transform_proto, parameter, - consumers): - return _create_stateful_user_defined_function_operation( +def create_data_stream_process_function(factory, transform_id, transform_proto, parameter, + consumers): + return _create_user_defined_function_operation( factory, transform_proto, consumers, parameter, - beam_operations.StatefulFunctionOperation, + beam_operations.StatelessFunctionOperation, operations.ProcessFunctionOperation) +@bundle_processor.BeamTransformFactory.register_urn( + operations.KEYED_PROCESS_FUNCTION_URN, + flink_fn_execution_pb2.UserDefinedDataStreamFunction) +def create_data_stream_keyed_process_function(factory, transform_id, transform_proto, parameter, + consumers): + return _create_user_defined_process_function_operation( Review comment: what about merge _create_user_defined_process_function_operation into _create_user_defined_function_operation? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org