hequn8128 commented on a change in pull request #13066:
URL: https://github.com/apache/flink/pull/13066#discussion_r466112647



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int):
         """
         self._j_data_stream.setBufferTimeout(timeout_millis)
         return self
+
+    def map(self, func, type_info=None):
+        """
+        Applies a Map transformation on a DataStream. The transformation calls 
a MapFunction for
+        each element of the DataStream. Each MapFunction call returns exactly 
one element. The user
+        can also extend RichMapFunction to gain access to other features 
provided by the
+        RichFunction interface.
+
+        Note that If user does not specify the output data type, the output 
data will be serialized
+        as pickle primitive byte array.
+
+        :param func: The MapFunction that is called for each element of the 
DataStream.
+        :param type_info: The type information of the MapFunction output data.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(func, MapFunction):
+            if callable(func):
+                func = MapFunctionWrapper(func)
+            else:
+                raise TypeError("The input muster be MapFunction or a callable 
function")

Review comment:
       The input must be a MapFunction

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int):
         """
         self._j_data_stream.setBufferTimeout(timeout_millis)
         return self
+
+    def map(self, func, type_info=None):

Review comment:
       Add type hint.
   ```
   from typing import Union, Callable
   def map(self, func: Union[Callable, MapFunction], type_info: TypeInformation 
= None) -> 'DataStream':
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int):
         """
         self._j_data_stream.setBufferTimeout(timeout_millis)
         return self
+
+    def map(self, func, type_info=None):
+        """
+        Applies a Map transformation on a DataStream. The transformation calls 
a MapFunction for
+        each element of the DataStream. Each MapFunction call returns exactly 
one element. The user
+        can also extend RichMapFunction to gain access to other features 
provided by the
+        RichFunction interface.
+
+        Note that If user does not specify the output data type, the output 
data will be serialized
+        as pickle primitive byte array.
+
+        :param func: The MapFunction that is called for each element of the 
DataStream.
+        :param type_info: The type information of the MapFunction output data.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(func, MapFunction):
+            if callable(func):
+                func = MapFunctionWrapper(func)
+            else:
+                raise TypeError("The input muster be MapFunction or a callable 
function")
+        func_name = "m_map_" + str(uuid.uuid1())
+        j_python_data_stream_scalar_function_operator, output_type_info = \
+            self._get_java_python_function_operator(func,
+                                                    type_info,
+                                                    func_name,
+                                                    flink_fn_execution_pb2
+                                                    
.UserDefinedDataStreamFunction.MAP)
+        return DataStream(self._j_data_stream.transform(
+            func_name,
+            output_type_info.get_java_type_info(),
+            j_python_data_stream_scalar_function_operator
+        ))
+
+    def flat_map(self, func, type_info=None):

Review comment:
       Type hint.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int):
         """
         self._j_data_stream.setBufferTimeout(timeout_millis)
         return self
+
+    def map(self, func, type_info=None):
+        """
+        Applies a Map transformation on a DataStream. The transformation calls 
a MapFunction for
+        each element of the DataStream. Each MapFunction call returns exactly 
one element. The user
+        can also extend RichMapFunction to gain access to other features 
provided by the
+        RichFunction interface.
+
+        Note that If user does not specify the output data type, the output 
data will be serialized
+        as pickle primitive byte array.
+

Review comment:
       Add an example in the python docs. 

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -160,3 +167,120 @@ def set_buffer_timeout(self, timeout_millis: int):
         """
         self._j_data_stream.setBufferTimeout(timeout_millis)
         return self
+
+    def map(self, func, type_info=None):
+        """
+        Applies a Map transformation on a DataStream. The transformation calls 
a MapFunction for
+        each element of the DataStream. Each MapFunction call returns exactly 
one element. The user
+        can also extend RichMapFunction to gain access to other features 
provided by the
+        RichFunction interface.
+
+        Note that If user does not specify the output data type, the output 
data will be serialized
+        as pickle primitive byte array.
+
+        :param func: The MapFunction that is called for each element of the 
DataStream.
+        :param type_info: The type information of the MapFunction output data.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(func, MapFunction):
+            if callable(func):
+                func = MapFunctionWrapper(func)
+            else:
+                raise TypeError("The input muster be MapFunction or a callable 
function")
+        func_name = "m_map_" + str(uuid.uuid1())
+        j_python_data_stream_scalar_function_operator, output_type_info = \
+            self._get_java_python_function_operator(func,
+                                                    type_info,
+                                                    func_name,
+                                                    flink_fn_execution_pb2
+                                                    
.UserDefinedDataStreamFunction.MAP)
+        return DataStream(self._j_data_stream.transform(
+            func_name,
+            output_type_info.get_java_type_info(),
+            j_python_data_stream_scalar_function_operator
+        ))
+
+    def flat_map(self, func, type_info=None):
+        """
+        Applies a FlatMap transformation on a DataStream. The transformation 
calls a FlatMapFunction
+        for each element of the DataStream. Each FlatMapFunction call can 
return any number of
+        elements including none. The user can also extend RichFlatMapFunction 
to gain access to
+        other features provided by the RichFUnction.
+
+        :param func: The FlatMapFunction that is called for each element of 
the DataStream.
+        :param type_info: The type information of output data.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(func, FlatMapFunction):
+            if callable(func):
+                func = FlatMapFunctionWrapper(func)
+            else:
+                raise TypeError("The input muster be FlatMapFunction or a 
callable function")

Review comment:
       The input must be a FlatMapFunction




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to