dianfu commented on a change in pull request #11118: [FLINK-16072][python] 
Optimize the performance of the write/read null mask
URL: https://github.com/apache/flink/pull/11118#discussion_r381768526
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##########
 @@ -24,57 +24,71 @@
 from pyflink.table import Row
 
 
+def generate_null_mask_search_table():
+    """
+    Each bit of one byte represents if the column at the specified position is 
None or not, e.g.
+    0x84 represents the first column and the sixth column are None.
+    """
+    num = 256
+    null_mask = []
+    for b in range(num):
+        every_num_null_mask = [(b & 0x80) > 0, (b & 0x40) > 0, (b & 0x20) > 0, 
(b & 0x10) > 0,
+                               (b & 0x08) > 0, (b & 0x04) > 0, (b & 0x02) > 0, 
(b & 0x01) > 0]
+        null_mask.append(tuple(every_num_null_mask))
+
+    return tuple(null_mask)
+
+
 class FlattenRowCoderImpl(StreamCoderImpl):
+    null_mask_search_table = generate_null_mask_search_table()
+
+    null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01)
 
     def __init__(self, field_coders):
         self._field_coders = field_coders
         self._filed_count = len(field_coders)
+        self._complete_byte_num = self._filed_count // 8
+        self._leading_bytes_num = self._filed_count % 8
 
     def encode_to_stream(self, value, out_stream, nested):
         self.write_null_mask(value, out_stream)
         for i in range(self._filed_count):
-            if value[i] is not None:
-                self._field_coders[i].encode_to_stream(value[i], out_stream, 
nested)
+            item = value[i]
+            if item is not None:
+                self._field_coders[i].encode_to_stream(item, out_stream, 
nested)
 
     def decode_from_stream(self, in_stream, nested):
-        null_mask = self.read_null_mask(self._filed_count, in_stream)
+        null_mask = self.read_null_mask(in_stream)
         return [None if null_mask[idx] else 
self._field_coders[idx].decode_from_stream(
             in_stream, nested) for idx in range(0, self._filed_count)]
 
-    @staticmethod
-    def write_null_mask(value, out_stream):
 
 Review comment:
   Could we add some unit tests for the null mask related logic?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to