hequn8128 commented on a change in pull request #13066: URL: https://github.com/apache/flink/pull/13066#discussion_r466112647
########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int): """ self._j_data_stream.setBufferTimeout(timeout_millis) return self + + def map(self, func, type_info=None): + """ + Applies a Map transformation on a DataStream. The transformation calls a MapFunction for + each element of the DataStream. Each MapFunction call returns exactly one element. The user + can also extend RichMapFunction to gain access to other features provided by the + RichFunction interface. + + Note that If user does not specify the output data type, the output data will be serialized + as pickle primitive byte array. + + :param func: The MapFunction that is called for each element of the DataStream. + :param type_info: The type information of the MapFunction output data. + :return: The transformed DataStream. + """ + if not isinstance(func, MapFunction): + if callable(func): + func = MapFunctionWrapper(func) + else: + raise TypeError("The input muster be MapFunction or a callable function") Review comment: The input must be a MapFunction ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int): """ self._j_data_stream.setBufferTimeout(timeout_millis) return self + + def map(self, func, type_info=None): Review comment: Add type hint. ``` from typing import Union, Callable def map(self, func: Union[Callable, MapFunction], type_info: TypeInformation = None) -> 'DataStream': ``` ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int): """ self._j_data_stream.setBufferTimeout(timeout_millis) return self + + def map(self, func, type_info=None): + """ + Applies a Map transformation on a DataStream. The transformation calls a MapFunction for + each element of the DataStream. Each MapFunction call returns exactly one element. The user + can also extend RichMapFunction to gain access to other features provided by the + RichFunction interface. + + Note that If user does not specify the output data type, the output data will be serialized + as pickle primitive byte array. + + :param func: The MapFunction that is called for each element of the DataStream. + :param type_info: The type information of the MapFunction output data. + :return: The transformed DataStream. + """ + if not isinstance(func, MapFunction): + if callable(func): + func = MapFunctionWrapper(func) + else: + raise TypeError("The input muster be MapFunction or a callable function") + func_name = "m_map_" + str(uuid.uuid1()) + j_python_data_stream_scalar_function_operator, output_type_info = \ + self._get_java_python_function_operator(func, + type_info, + func_name, + flink_fn_execution_pb2 + .UserDefinedDataStreamFunction.MAP) + return DataStream(self._j_data_stream.transform( + func_name, + output_type_info.get_java_type_info(), + j_python_data_stream_scalar_function_operator + )) + + def flat_map(self, func, type_info=None): Review comment: Type hint. ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int): """ self._j_data_stream.setBufferTimeout(timeout_millis) return self + + def map(self, func, type_info=None): + """ + Applies a Map transformation on a DataStream. The transformation calls a MapFunction for + each element of the DataStream. Each MapFunction call returns exactly one element. The user + can also extend RichMapFunction to gain access to other features provided by the + RichFunction interface. + + Note that If user does not specify the output data type, the output data will be serialized + as pickle primitive byte array. + Review comment: Add an example in the python docs. ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int): """ self._j_data_stream.setBufferTimeout(timeout_millis) return self + + def map(self, func, type_info=None): + """ + Applies a Map transformation on a DataStream. The transformation calls a MapFunction for + each element of the DataStream. Each MapFunction call returns exactly one element. The user + can also extend RichMapFunction to gain access to other features provided by the + RichFunction interface. + + Note that If user does not specify the output data type, the output data will be serialized + as pickle primitive byte array. + + :param func: The MapFunction that is called for each element of the DataStream. + :param type_info: The type information of the MapFunction output data. + :return: The transformed DataStream. + """ + if not isinstance(func, MapFunction): + if callable(func): + func = MapFunctionWrapper(func) + else: + raise TypeError("The input muster be MapFunction or a callable function") + func_name = "m_map_" + str(uuid.uuid1()) + j_python_data_stream_scalar_function_operator, output_type_info = \ + self._get_java_python_function_operator(func, + type_info, + func_name, + flink_fn_execution_pb2 + .UserDefinedDataStreamFunction.MAP) + return DataStream(self._j_data_stream.transform( + func_name, + output_type_info.get_java_type_info(), + j_python_data_stream_scalar_function_operator + )) + + def flat_map(self, func, type_info=None): + """ + Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction + for each element of the DataStream. Each FlatMapFunction call can return any number of + elements including none. The user can also extend RichFlatMapFunction to gain access to + other features provided by the RichFUnction. + + :param func: The FlatMapFunction that is called for each element of the DataStream. + :param type_info: The type information of output data. + :return: The transformed DataStream. + """ + if not isinstance(func, FlatMapFunction): + if callable(func): + func = FlatMapFunctionWrapper(func) + else: + raise TypeError("The input muster be FlatMapFunction or a callable function") Review comment: The input must be a FlatMapFunction ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org