hequn8128 commented on a change in pull request #11342: [FLINK-16483][python] Add Python building blocks to make sure the basic functionality of vectorized Python UDF could work URL: https://github.com/apache/flink/pull/11342#discussion_r389758551
########## File path: flink-python/pyflink/fn_execution/coder_impl.py ########## @@ -373,3 +376,45 @@ def internal_to_timestamp(self, milliseconds, nanoseconds): second, microsecond = (milliseconds // 1000, milliseconds % 1000 * 1000 + nanoseconds // 1000) return datetime.datetime.utcfromtimestamp(second).replace(microsecond=microsecond) + + +class ArrowCoderImpl(StreamCoderImpl): + + def __init__(self, schema): + self._schema = schema + self._resettable_io = ResettableIO() + + def encode_to_stream(self, cols, out_stream, nested): + if not hasattr(self, "_batch_writer"): + self._batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema) + + self._resettable_io.set_output_stream(out_stream) + self._batch_writer.write_batch(self._create_batch(cols)) + + def decode_from_stream(self, in_stream, nested): + if not hasattr(self, "_batch_reader"): + def load_from_stream(stream): + reader = pa.ipc.open_stream(stream) + for batch in reader: + yield batch + + self._batch_reader = load_from_stream(self._resettable_io) + + self._resettable_io.set_input_bytes(in_stream.read_all()) + table = pa.Table.from_batches([next(self._batch_reader)]) Review comment: Why we only get the first batch in the batch list? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services