Hi Folks, I’d like to secure my pyFlink job with tests. I haven’t found any information writing pytest-tests in the web, so I have read through the test-cases of pyflink itself and concluded, that I could test my KeyedProcessFunctions similar as in https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/tests/test_data_stream.py . Is this a good strategy?
When I tried to subclass ,PyFlinkStreamingTestCase, see code below, I got this error: ERROR tests/test_hello_world.py::HelloWorldTests::test_keyed_sum - TypeError: Could not found the Java class 'org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration.Builder'. The Java depe... I haven’t found the jar-File in which MiniClusterResourceConfiguration is defined. Can anybody point me a direction, where to find this class and how to add it to the test-framework? Best, Kaspar ———————————————————————— test_hello_world.py ———————————————————————— from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase from pyflink.common.typeinfo import Types from pyflink.util.java_utils import get_j_env_configuration import pytest import sys import pickle from pyflink.datastream.functions import SinkFunction from pyflink.java_gateway import get_gateway class DataStreamTestSinkFunction(SinkFunction): """ A util class to collect test DataStream transformation results. """ def __init__(self): self.j_data_stream_collect_sink = get_gateway().jvm \ .org.apache.flink.python.util.DataStreamTestCollectSink() super(DataStreamTestSinkFunction, self).__init__(sink_func=self.j_data_stream_collect_sink) def get_results(self, is_python_object: bool = False, stringify: bool = True): j_results = self.get_java_function().collectAndClear(is_python_object) results = list(j_results) if not is_python_object: return results else: str_results = [] for result in results: pickled_result = pickle.loads(result) if stringify: str_results.append(str(pickled_result)) else: str_results.append(pickled_result) return str_results def clear(self): if self.j_data_stream_collect_sink is None: return self.j_data_stream_collect_sink.clear() class HelloWorldTests(PyFlinkStreamingTestCase): def setUp(self) -> None: super(HelloWorldTests, self).setUp() config = get_j_env_configuration(self.env._j_stream_execution_environment) config.setString("pekko.ask.timeout", "20 s") self.test_sink = DataStreamTestSinkFunction() def tearDown(self) -> None: self.test_sink.clear() def assert_equals_sorted(self, expected, actual): expected.sort() actual.sort() self.assertEqual(expected, actual) def test_keyed_sum(self): self.env.set_parallelism(1) ds = self.env.from_collection( [(1, 1), (1, 2), (1, 3), (2, 5), (2, 1)], type_info=Types.ROW_NAMED(["v1", "v2"], [Types.INT(), Types.INT()]) ) ds.key_by(lambda x: x[0]) \ .sum("v2") \ .key_by(lambda x: x[0]) \ .sum(1) \ .map(lambda x: (x[1], x[0]), output_type=Types.TUPLE([Types.INT(), Types.INT()])) \ .key_by(lambda x: x[1]) \ .sum() \ .add_sink(self.test_sink) self.env.execute("key_by_sum_test_stream") results = self.test_sink.get_results(False) expected = ['(1,1)', '(5,1)', '(15,1)', '(5,2)', '(16,2)'] self.assert_equals_sorted(expected, results) if __name__ == "__main__": sys.exit(pytest.main())