dianfu commented on a change in pull request #15877:
URL: https://github.com/apache/flink/pull/15877#discussion_r631760702



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -64,274 +58,39 @@ def __repr__(self):
     common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder)
 
 
-class BeamTableFunctionRowCoder(FastCoder):
-    """
-    Coder for Table Function Row.
-    """
+class BeamCoder(FastCoder):

Review comment:
       The name `BeamCoder` seems not quite suitable. What about rename it to 
`FlinkCoder`?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -38,10 +34,8 @@
     beam_coder_impl = beam_coder_impl_slow
     BeamCoderImpl = lambda a: a
 
-from pyflink.fn_execution import flink_fn_execution_pb2, coders
-from pyflink.table.types import TinyIntType, SmallIntType, IntType, 
BigIntType, BooleanType, \
-    FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, 
TimeType, \
-    LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, 
ArrayType
+
+FLINK_CODER_URN = "flink:coder"

Review comment:
       ```suggestion
   FLINK_CODER_URN = "flink:coder:v1"
   ```

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -35,85 +38,110 @@
            'TimestampCoder', 'BasicArrayCoder', 'PrimitiveArrayCoder', 
'MapCoder', 'DecimalCoder',
            'TimeWindowCoder', 'CountWindowCoder']
 
-# table coders
-FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN = 
"flink:coder:schema:scalar_function:v1"
-FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:table_function:v1"
-FLINK_AGGREGATE_FUNCTION_SCHEMA_CODER_URN = 
"flink:coder:schema:aggregate_function:v1"
-FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN = 
"flink:coder:schema:scalar_function:arrow:v1"
-FLINK_SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1"
-FLINK_OVER_WINDOW_ARROW_CODER_URN = 
"flink:coder:schema:batch_over_window:arrow:v1"
-
-
-# datastream coders
-FLINK_MAP_CODER_URN = "flink:coder:map:v1"
-FLINK_FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1"
-
 
+# BaseCoder will be used in Operations and other coders will be the field 
coder of BaseCoder
 class BaseCoder(ABC):
-    def get_impl(self):
-        pass
-
-    @staticmethod
-    def from_schema_proto(schema_proto):
-        pass
-
-
-class TableFunctionRowCoder(BaseCoder):
-    """
-    Coder for Table Function Row.
-    """
-
-    def __init__(self, flatten_row_coder):
-        self._flatten_row_coder = flatten_row_coder
+    def __init__(self, output_mode):
+        self._output_mode = output_mode
 
+    @abstractmethod
     def get_impl(self):
-        return 
coder_impl.TableFunctionRowCoderImpl(self._flatten_row_coder.get_impl())
-
-    @staticmethod
-    def from_schema_proto(coder_param_proto):
-        return 
TableFunctionRowCoder(FlattenRowCoder.from_schema_proto(coder_param_proto))
-
-    def __repr__(self):
-        return 'TableFunctionRowCoder[%s]' % repr(self._flatten_row_coder)
-
-    def __eq__(self, other):
-        return (self.__class__ == other.__class__
-                and self._flatten_row_coder == other._flatten_row_coder)
-
-    def __ne__(self, other):
-        return not self == other
+        pass
 
-    def __hash__(self):
-        return hash(self._flatten_row_coder)
+    @classmethod
+    def from_coder_param_proto(cls, coder_param_proto):
+        data_type = coder_param_proto.data_type
+        output_mode = coder_param_proto.output_mode
+        if data_type == flink_fn_execution_pb2.CoderParam.FLATTEN_ROW:
+            if coder_param_proto.HasField('schema'):
+                schema_proto = coder_param_proto.schema
+                field_coders = [from_proto(f.type) for f in 
schema_proto.fields]
+            else:
+                type_info_proto = coder_param_proto.type_info
+                field_coders = [from_type_info_proto(f.field_type)
+                                for f in type_info_proto.row_type_info.fields]
+            return FlattenRowCoder(field_coders, output_mode)
+        elif data_type == flink_fn_execution_pb2.CoderParam.ROW:
+            schema_proto = coder_param_proto.schema
+            field_coders = [from_proto(f.type) for f in schema_proto.fields]
+            field_names = [f.name for f in schema_proto.fields]
+            return TopRowCoder(field_coders, field_names, output_mode)
+        elif data_type == flink_fn_execution_pb2.CoderParam.RAW:
+            type_info_proto = coder_param_proto.type_info
+            field_coder = from_type_info_proto(type_info_proto)
+            return RawCoder(field_coder, output_mode)
+        elif data_type == flink_fn_execution_pb2.CoderParam.ARROW:
+            timezone = pytz.timezone(os.environ['table.exec.timezone'])
+            schema_proto = coder_param_proto.schema
+            row_type = cls._to_row_type(schema_proto)
+            return ArrowCoder(cls._to_arrow_schema(row_type), row_type, 
timezone, output_mode)
+        elif data_type == 
flink_fn_execution_pb2.CoderParam.BATCH_OVER_WINDOW_ARROW:

Review comment:
       What about rename it to `OVER_WINDOW_ARROW` to match with the class name 
`OverWindowArrowCoder `?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to