dianfu commented on a change in pull request #11718: [FLINK-17118][python] Support Primitive DataTypes in Cython URL: https://github.com/apache/flink/pull/11718#discussion_r408784385
########## File path: flink-python/pyflink/fn_execution/tests/test_coders_common.py ########## @@ -36,65 +36,230 @@ def check_coder(self, coder, *values): else: self.assertEqual(v, coder.decode(coder.encode(v))) + def check_cython_coder(self, python_field_coders, cython_field_coders, data): + from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream + from pyflink.fn_execution import coder_impl + from pyflink.fn_execution import fast_coder_impl + from pyflink.fn_execution.fast_coder_impl import InputStreamAndFunctionWrapper + py_flatten_row_coder = coder_impl.FlattenRowCoderImpl(python_field_coders) + internal = py_flatten_row_coder.encode(data) + input_stream = create_InputStream(internal) + output_stream = create_OutputStream() + cy_flatten_row_coder = fast_coder_impl.FlattenRowCoderImpl(cython_field_coders) + value = cy_flatten_row_coder.decode_from_stream(input_stream, False) + wrapper_func_input_element = InputStreamAndFunctionWrapper( + lambda v: [v[i] for i in range(len(v))], value) + cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element, output_stream, False) + generator_result = py_flatten_row_coder.decode_from_stream(create_InputStream( + output_stream.get()), False) + result = [] + for item in generator_result: + result.append(item) + try: + self.assertEqual(result, data) + except AssertionError: + self.assertEqual(len(result), len(data)) + self.assertEqual(len(result[0]), len(data[0])) + for i in range(len(data[0])): + if isinstance(data[0][i], float): + from pyflink.table.tests.test_udf import float_equal + assert float_equal(data[0][i], result[0][i], 1e-6) + else: + self.assertEqual(data[0][i], result[0][i]) + # decide whether two floats are equal @staticmethod def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0): return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) + def skip_python_test(self): Review comment: Use unittest.skipIf ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services