hequn8128 commented on a change in pull request #11342: [FLINK-16483][python] Add Python building blocks to make sure the basic functionality of vectorized Python UDF could work URL: https://github.com/apache/flink/pull/11342#discussion_r389769215
########## File path: flink-python/pyflink/fn_execution/coder_impl.py ########## @@ -373,3 +376,45 @@ def internal_to_timestamp(self, milliseconds, nanoseconds): second, microsecond = (milliseconds // 1000, milliseconds % 1000 * 1000 + nanoseconds // 1000) return datetime.datetime.utcfromtimestamp(second).replace(microsecond=microsecond) + + +class ArrowCoderImpl(StreamCoderImpl): + + def __init__(self, schema): + self._schema = schema + self._resettable_io = ResettableIO() + + def encode_to_stream(self, cols, out_stream, nested): + if not hasattr(self, "_batch_writer"): + self._batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema) + + self._resettable_io.set_output_stream(out_stream) + self._batch_writer.write_batch(self._create_batch(cols)) + + def decode_from_stream(self, in_stream, nested): + if not hasattr(self, "_batch_reader"): Review comment: I'm wondering if we can avoid these `hasattr`. Can we use `pa.ipc.open_stream` read the stream directly and return the batch? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services