dianfu commented on a change in pull request #11118: [FLINK-16072][python] Optimize the performance of the write/read null mask URL: https://github.com/apache/flink/pull/11118#discussion_r381768526
########## File path: flink-python/pyflink/fn_execution/coder_impl.py ########## @@ -24,57 +24,71 @@ from pyflink.table import Row +def generate_null_mask_search_table(): + """ + Each bit of one byte represents if the column at the specified position is None or not, e.g. + 0x84 represents the first column and the sixth column are None. + """ + num = 256 + null_mask = [] + for b in range(num): + every_num_null_mask = [(b & 0x80) > 0, (b & 0x40) > 0, (b & 0x20) > 0, (b & 0x10) > 0, + (b & 0x08) > 0, (b & 0x04) > 0, (b & 0x02) > 0, (b & 0x01) > 0] + null_mask.append(tuple(every_num_null_mask)) + + return tuple(null_mask) + + class FlattenRowCoderImpl(StreamCoderImpl): + null_mask_search_table = generate_null_mask_search_table() + + null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01) def __init__(self, field_coders): self._field_coders = field_coders self._filed_count = len(field_coders) + self._complete_byte_num = self._filed_count // 8 + self._leading_bytes_num = self._filed_count % 8 def encode_to_stream(self, value, out_stream, nested): self.write_null_mask(value, out_stream) for i in range(self._filed_count): - if value[i] is not None: - self._field_coders[i].encode_to_stream(value[i], out_stream, nested) + item = value[i] + if item is not None: + self._field_coders[i].encode_to_stream(item, out_stream, nested) def decode_from_stream(self, in_stream, nested): - null_mask = self.read_null_mask(self._filed_count, in_stream) + null_mask = self.read_null_mask(in_stream) return [None if null_mask[idx] else self._field_coders[idx].decode_from_stream( in_stream, nested) for idx in range(0, self._filed_count)] - @staticmethod - def write_null_mask(value, out_stream): Review comment: Could we add some unit tests for the null mask related logic? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services