dianfu commented on a change in pull request #15877: URL: https://github.com/apache/flink/pull/15877#discussion_r631760702
########## File path: flink-python/pyflink/fn_execution/beam/beam_coders.py ########## @@ -64,274 +58,39 @@ def __repr__(self): common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder) -class BeamTableFunctionRowCoder(FastCoder): - """ - Coder for Table Function Row. - """ +class BeamCoder(FastCoder): Review comment: The name `BeamCoder` seems not quite suitable. What about rename it to `FlinkCoder`? ########## File path: flink-python/pyflink/fn_execution/beam/beam_coders.py ########## @@ -38,10 +34,8 @@ beam_coder_impl = beam_coder_impl_slow BeamCoderImpl = lambda a: a -from pyflink.fn_execution import flink_fn_execution_pb2, coders -from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \ - FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \ - LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType + +FLINK_CODER_URN = "flink:coder" Review comment: ```suggestion FLINK_CODER_URN = "flink:coder:v1" ``` ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -35,85 +38,110 @@ 'TimestampCoder', 'BasicArrayCoder', 'PrimitiveArrayCoder', 'MapCoder', 'DecimalCoder', 'TimeWindowCoder', 'CountWindowCoder'] -# table coders -FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:scalar_function:v1" -FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:table_function:v1" -FLINK_AGGREGATE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:aggregate_function:v1" -FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN = "flink:coder:schema:scalar_function:arrow:v1" -FLINK_SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1" -FLINK_OVER_WINDOW_ARROW_CODER_URN = "flink:coder:schema:batch_over_window:arrow:v1" - - -# datastream coders -FLINK_MAP_CODER_URN = "flink:coder:map:v1" -FLINK_FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1" - +# BaseCoder will be used in Operations and other coders will be the field coder of BaseCoder class BaseCoder(ABC): - def get_impl(self): - pass - - @staticmethod - def from_schema_proto(schema_proto): - pass - - -class TableFunctionRowCoder(BaseCoder): - """ - Coder for Table Function Row. - """ - - def __init__(self, flatten_row_coder): - self._flatten_row_coder = flatten_row_coder + def __init__(self, output_mode): + self._output_mode = output_mode + @abstractmethod def get_impl(self): - return coder_impl.TableFunctionRowCoderImpl(self._flatten_row_coder.get_impl()) - - @staticmethod - def from_schema_proto(coder_param_proto): - return TableFunctionRowCoder(FlattenRowCoder.from_schema_proto(coder_param_proto)) - - def __repr__(self): - return 'TableFunctionRowCoder[%s]' % repr(self._flatten_row_coder) - - def __eq__(self, other): - return (self.__class__ == other.__class__ - and self._flatten_row_coder == other._flatten_row_coder) - - def __ne__(self, other): - return not self == other + pass - def __hash__(self): - return hash(self._flatten_row_coder) + @classmethod + def from_coder_param_proto(cls, coder_param_proto): + data_type = coder_param_proto.data_type + output_mode = coder_param_proto.output_mode + if data_type == flink_fn_execution_pb2.CoderParam.FLATTEN_ROW: + if coder_param_proto.HasField('schema'): + schema_proto = coder_param_proto.schema + field_coders = [from_proto(f.type) for f in schema_proto.fields] + else: + type_info_proto = coder_param_proto.type_info + field_coders = [from_type_info_proto(f.field_type) + for f in type_info_proto.row_type_info.fields] + return FlattenRowCoder(field_coders, output_mode) + elif data_type == flink_fn_execution_pb2.CoderParam.ROW: + schema_proto = coder_param_proto.schema + field_coders = [from_proto(f.type) for f in schema_proto.fields] + field_names = [f.name for f in schema_proto.fields] + return TopRowCoder(field_coders, field_names, output_mode) + elif data_type == flink_fn_execution_pb2.CoderParam.RAW: + type_info_proto = coder_param_proto.type_info + field_coder = from_type_info_proto(type_info_proto) + return RawCoder(field_coder, output_mode) + elif data_type == flink_fn_execution_pb2.CoderParam.ARROW: + timezone = pytz.timezone(os.environ['table.exec.timezone']) + schema_proto = coder_param_proto.schema + row_type = cls._to_row_type(schema_proto) + return ArrowCoder(cls._to_arrow_schema(row_type), row_type, timezone, output_mode) + elif data_type == flink_fn_execution_pb2.CoderParam.BATCH_OVER_WINDOW_ARROW: Review comment: What about rename it to `OVER_WINDOW_ARROW` to match with the class name `OverWindowArrowCoder `? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org