HuangXingBo commented on a change in pull request #13066: URL: https://github.com/apache/flink/pull/13066#discussion_r465656093
########## File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py ########## @@ -305,6 +368,39 @@ def decode_from_stream(self, in_stream, nested): return value +class BasicDecimalCoderImpl(StreamCoderImpl): + + def encode_to_stream(self, value, stream, nested): + bytes_value = str(value).encode("utf-8") + stream.write_bigendian_int32(len(bytes_value)) + stream.write(bytes_value, False) + + def decode_from_stream(self, stream, nested): + size = stream.read_bigendian_int32() + value = decimal.Decimal(stream.read(size).decode("utf-8")) + return value + + +class TupleCoderImpl(StreamCoderImpl): + def __init__(self, field_coders): + self._field_coders = field_coders + self._field_count = len(field_coders) + + def encode_to_stream(self, value, out_stream, nested): + field_coders = self._field_coders + for i in range(self._field_count): + field_coders[i].encode_to_stream(value[i], out_stream, nested) + + def decode_from_stream(self, stream, nested): + decoded_list = [] + for idx in range(self._field_count): Review comment: Maybe we can use `list comprehension` ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -109,6 +112,10 @@ def __hash__(self): class FieldCoder(ABC): + + def get_slow_impl(self): + pass Review comment: I think it is a temporary solution for no-cython coder.Right? ########## File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py ########## @@ -154,6 +167,13 @@ def create_table_function(factory, transform_id, transform_proto, parameter, con factory, transform_proto, consumers, parameter, TableFunctionOperation) +@bundle_processor.BeamTransformFactory.register_urn( + DATA_STREAM_FUNCTION_URN, flink_fn_execution_pb2.UserDefinedDataStreamFunctions) Review comment: Use `operation_utils . DATA_STREAM_FUNCTION_URN ` and Don't import `DATA_STREAM_FUNCTION_URN` directly. ########## File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py ########## @@ -186,6 +187,68 @@ def __repr__(self): return 'ArrayCoderImpl[%s]' % repr(self._elem_coder) +class PickledBytesCoderImpl(StreamCoderImpl): + + def __init__(self): + self.field_coder = BinaryCoderImpl() + + def encode_to_stream(self, value, out_stream, nested): + coded_data = pickle.dumps(value) + real_coded_data = self.field_coder.encode(coded_data) Review comment: Why not call the method `encode_to_stream` of the field_coder? The method of `encode` will create a temporary buffer. ########## File path: flink-python/pyflink/fn_execution/beam/beam_coders.py ########## @@ -209,3 +211,51 @@ def _to_row_type(row_schema): def __repr__(self): return 'ArrowCoder[%s]' % self._schema + + +class DataStreamStatelessMapCoder(FastCoder): + + def __init__(self, field_coder): + self._field_coder = field_coder Review comment: Since its filed coder is a certain type, we can use a more specific name ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -271,6 +309,21 @@ def __init__(self, precision, scale): def get_impl(self): return coder_impl.DecimalCoderImpl(self.precision, self.scale) + def get_slow_impl(self): + return coder_impl_slow.DecimalCoderImpl(self.precision, self.scale) + + +class BasicDecimalCoder(FieldCoder): + """ + Coder for Basic Decimal that no need to have precision and scale specified. + """ + + def get_impl(self): + pass Review comment: I think return None is not a good chocie. Maybe you can raise a exception ########## File path: flink-python/pyflink/fn_execution/tests/test_coders.py ########## @@ -154,6 +154,17 @@ def test_row_coder(self): v = Row(*[None if i % 2 == 0 else i for i in range(field_count)]) self.check_coder(coder, v) + def test_basic_decimal_coder(self): + basic_dec_coder = BasicDecimalCoder() + value = 1.200 Review comment: 1.200 is a float, not a Decimal. ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -332,6 +400,39 @@ def __init__(self, precision, timezone): def get_impl(self): return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + def get_slow_impl(self): + return coder_impl_slow.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + + +class PickledBytesCoder(FieldCoder): + + def get_impl(self): + return None + + def get_slow_impl(self): + return coder_impl_slow.PickledBytesCoderImpl() + + +class TupleCoder(FieldCoder): + + def __init__(self, field_coders): + self._field_coders = field_coders + + def get_slow_impl(self): + return self._create_impl() + + def get_impl(self): + return None Review comment: ditto ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -379,3 +480,40 @@ def from_proto(field_type): field_type.decimal_info.scale) else: raise ValueError("field_type %s is not supported." % field_type) + + +# for data stream type information. +type_info_name = flink_fn_execution_pb2.TypeInfo +_type_info_name_mappings = { + type_info_name.STRING: CharCoder(), + type_info_name.BYTE: TinyIntCoder(), + type_info_name.BOOLEAN: BooleanCoder(), + type_info_name.SHORT: SmallIntCoder(), + type_info_name.INT: IntCoder(), + type_info_name.LONG: BigIntCoder(), + type_info_name.FLOAT: FloatCoder(), + type_info_name.DOUBLE: DoubleCoder(), + type_info_name.CHAR: CharCoder(), + type_info_name.BIG_INT: BigIntCoder(), + type_info_name.BIG_DEC: BasicDecimalCoder(), + type_info_name.SQL_DATE: DateCoder(), + type_info_name.SQL_TIME: TimeCoder(), + type_info_name.SQL_TIMESTAMP: TimeCoder(), + type_info_name.LOCAL_DATE: DateCoder(), + type_info_name.PICKLED_BYTES: PickledBytesCoder() +} + + +def from_type_info_proto(field_type): + field_type_name = field_type.type_name + coder = _type_info_name_mappings.get(field_type_name) Review comment: Maybe we can write a more python code: try: return catch _type_info_name_mappings[field_type_name] catch KeyError: # other coder logic ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -332,6 +400,39 @@ def __init__(self, precision, timezone): def get_impl(self): return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + def get_slow_impl(self): + return coder_impl_slow.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + + +class PickledBytesCoder(FieldCoder): + + def get_impl(self): + return None Review comment: I think return None is not a good chocie. Maybe you can raise a exception ########## File path: flink-python/pyflink/fn_execution/beam/beam_coders.py ########## @@ -209,3 +211,51 @@ def _to_row_type(row_schema): def __repr__(self): return 'ArrowCoder[%s]' % self._schema + + +class DataStreamStatelessMapCoder(FastCoder): + + def __init__(self, field_coder): + self._field_coder = field_coder + + def _create_impl(self): + return beam_coder_impl_slow.DataStreamStatelessMapCoderImpl( + self._field_coder.get_slow_impl()) + + def is_deterministic(self): # type: () -> bool + return all(c.is_deterministic() for c in self._field_coder) + + @Coder.register_urn(FLINK_MAP_FUNCTION_DATA_STREAM_CODER_URN, flink_fn_execution_pb2.TypeInfo) + def _pickled_from_runner_api_parameter(type_info_proto, unused_components, unused_context): + return DataStreamStatelessMapCoder(from_type_info_proto(type_info_proto.field[0].type)) + + def to_type_hint(self): + pass + + def __repr__(self): + return 'DataStreamStatelessMapCoder[%s]' % repr(self._field_coder) + + +class DataStreamStatelessFlatMapCoder(FastCoder): + + def __init__(self, field_coder): + self._field_coder = field_coder + + def _create_impl(self): + return beam_coder_impl_slow.DataStreamStatelessFlatMapCoderImpl( + self._field_coder.get_impl()) + + def is_deterministic(self): # type: () -> bool + return all(c.is_deterministic() for c in self._field_coder) + + @Coder.register_urn(FLINK_FLAT_MAP_FUNCTION_DATA_STREAM_CODER_URN, + flink_fn_execution_pb2.TypeInfo) + def _pickled_from_runner_api_parameter(type_info_proto, unused_components, unused_context): + return DataStreamStatelessFlatMapCoder(DataStreamStatelessMapCoder( + from_type_info_proto(type_info_proto.field[0].type))) + + def to_type_hint(self): + pass Review comment: ditto ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -332,6 +400,39 @@ def __init__(self, precision, timezone): def get_impl(self): return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + def get_slow_impl(self): + return coder_impl_slow.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + + +class PickledBytesCoder(FieldCoder): + + def get_impl(self): + return None + + def get_slow_impl(self): + return coder_impl_slow.PickledBytesCoderImpl() + + +class TupleCoder(FieldCoder): + + def __init__(self, field_coders): + self._field_coders = field_coders + + def get_slow_impl(self): + return self._create_impl() + + def get_impl(self): + return None + + def _create_impl(self): + return coder_impl_slow.TupleCoderImpl([c.get_impl() for c in self._field_coders]) Review comment: Maybe it is `return coder_impl_slow.TupleCoderImpl([c. get_slow_impl() for c in self._field_coders])` ? ########## File path: flink-python/pyflink/fn_execution/beam/beam_coders.py ########## @@ -209,3 +211,51 @@ def _to_row_type(row_schema): def __repr__(self): return 'ArrowCoder[%s]' % self._schema + + +class DataStreamStatelessMapCoder(FastCoder): + + def __init__(self, field_coder): + self._field_coder = field_coder + + def _create_impl(self): + return beam_coder_impl_slow.DataStreamStatelessMapCoderImpl( + self._field_coder.get_slow_impl()) + + def is_deterministic(self): # type: () -> bool + return all(c.is_deterministic() for c in self._field_coder) + + @Coder.register_urn(FLINK_MAP_FUNCTION_DATA_STREAM_CODER_URN, flink_fn_execution_pb2.TypeInfo) + def _pickled_from_runner_api_parameter(type_info_proto, unused_components, unused_context): + return DataStreamStatelessMapCoder(from_type_info_proto(type_info_proto.field[0].type)) + + def to_type_hint(self): + pass Review comment: use `typehints.Generator` ########## File path: flink-python/pyflink/fn_execution/coders.py ########## @@ -332,6 +400,39 @@ def __init__(self, precision, timezone): def get_impl(self): return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + def get_slow_impl(self): + return coder_impl_slow.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + + +class PickledBytesCoder(FieldCoder): + + def get_impl(self): + return None + + def get_slow_impl(self): + return coder_impl_slow.PickledBytesCoderImpl() + + +class TupleCoder(FieldCoder): + + def __init__(self, field_coders): + self._field_coders = field_coders + + def get_slow_impl(self): + return self._create_impl() + + def get_impl(self): + return None + + def _create_impl(self): + return coder_impl_slow.TupleCoderImpl([c.get_impl() for c in self._field_coders]) + + def to_type_hint(self): + return tuple Review comment: You should use `typehints.Tuple` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org