dianfu commented on a change in pull request #11718: [FLINK-17118][python] 
Support Primitive DataTypes in Cython
URL: https://github.com/apache/flink/pull/11718#discussion_r408784385
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/tests/test_coders_common.py
 ##########
 @@ -36,65 +36,230 @@ def check_coder(self, coder, *values):
             else:
                 self.assertEqual(v, coder.decode(coder.encode(v)))
 
+    def check_cython_coder(self, python_field_coders, cython_field_coders, 
data):
+        from apache_beam.coders.coder_impl import create_InputStream, 
create_OutputStream
+        from pyflink.fn_execution import coder_impl
+        from pyflink.fn_execution import fast_coder_impl
+        from pyflink.fn_execution.fast_coder_impl import 
InputStreamAndFunctionWrapper
+        py_flatten_row_coder = 
coder_impl.FlattenRowCoderImpl(python_field_coders)
+        internal = py_flatten_row_coder.encode(data)
+        input_stream = create_InputStream(internal)
+        output_stream = create_OutputStream()
+        cy_flatten_row_coder = 
fast_coder_impl.FlattenRowCoderImpl(cython_field_coders)
+        value = cy_flatten_row_coder.decode_from_stream(input_stream, False)
+        wrapper_func_input_element = InputStreamAndFunctionWrapper(
+            lambda v: [v[i] for i in range(len(v))], value)
+        cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element, 
output_stream, False)
+        generator_result = 
py_flatten_row_coder.decode_from_stream(create_InputStream(
+            output_stream.get()), False)
+        result = []
+        for item in generator_result:
+            result.append(item)
+        try:
+            self.assertEqual(result, data)
+        except AssertionError:
+            self.assertEqual(len(result), len(data))
+            self.assertEqual(len(result[0]), len(data[0]))
+            for i in range(len(data[0])):
+                if isinstance(data[0][i], float):
+                    from pyflink.table.tests.test_udf import float_equal
+                    assert float_equal(data[0][i], result[0][i], 1e-6)
+                else:
+                    self.assertEqual(data[0][i], result[0][i])
+
     # decide whether two floats are equal
     @staticmethod
     def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
         return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
 
+    def skip_python_test(self):
 
 Review comment:
   Use unittest.skipIf

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to