Hi Folks,

I’d like to secure my pyFlink job with tests. I haven’t found any information 
writing pytest-tests in the web, so I have read through the test-cases of 
pyflink itself and concluded, that I could test my KeyedProcessFunctions 
similar as in 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/tests/test_data_stream.py
 .  Is this a good strategy? 


When I tried to subclass ,PyFlinkStreamingTestCase, see code below, I got this 
error:

ERROR tests/test_hello_world.py::HelloWorldTests::test_keyed_sum - TypeError: 
Could not found the Java class 
'org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration.Builder'. 
The Java depe...

I haven’t found the jar-File in which MiniClusterResourceConfiguration is 
defined. Can anybody point me a direction, where to find this class and how to 
add it to the test-framework?

Best,
Kaspar



————————————————————————  test_hello_world.py ————————————————————————



from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
from pyflink.common.typeinfo import Types
from pyflink.util.java_utils import get_j_env_configuration
import pytest
import sys

import pickle

from pyflink.datastream.functions import SinkFunction
from pyflink.java_gateway import get_gateway


class DataStreamTestSinkFunction(SinkFunction):
    """
    A util class to collect test DataStream transformation results.
    """

    def __init__(self):
        self.j_data_stream_collect_sink = get_gateway().jvm \
            .org.apache.flink.python.util.DataStreamTestCollectSink()
        super(DataStreamTestSinkFunction, 
self).__init__(sink_func=self.j_data_stream_collect_sink)

    def get_results(self, is_python_object: bool = False, stringify: bool = 
True):
        j_results = self.get_java_function().collectAndClear(is_python_object)
        results = list(j_results)
        if not is_python_object:
            return results
        else:
            str_results = []
            for result in results:
                pickled_result = pickle.loads(result)
                if stringify:
                    str_results.append(str(pickled_result))
                else:
                    str_results.append(pickled_result)
            return str_results

    def clear(self):
        if self.j_data_stream_collect_sink is None:
            return
        self.j_data_stream_collect_sink.clear()



class HelloWorldTests(PyFlinkStreamingTestCase):

    def setUp(self) -> None:
        super(HelloWorldTests, self).setUp()
        config = 
get_j_env_configuration(self.env._j_stream_execution_environment)
        config.setString("pekko.ask.timeout", "20 s")
        self.test_sink = DataStreamTestSinkFunction()

    def tearDown(self) -> None:
        self.test_sink.clear()
        
        
    def assert_equals_sorted(self, expected, actual):
        expected.sort()
        actual.sort()
        self.assertEqual(expected, actual)

    def test_keyed_sum(self):
        self.env.set_parallelism(1)
        ds = self.env.from_collection(
            [(1, 1), (1, 2), (1, 3), (2, 5), (2, 1)],
            type_info=Types.ROW_NAMED(["v1", "v2"], [Types.INT(), Types.INT()])
        )

        ds.key_by(lambda x: x[0]) \
            .sum("v2") \
            .key_by(lambda x: x[0]) \
            .sum(1) \
            .map(lambda x: (x[1], x[0]), output_type=Types.TUPLE([Types.INT(), 
Types.INT()])) \
            .key_by(lambda x: x[1]) \
            .sum() \
            .add_sink(self.test_sink)

        self.env.execute("key_by_sum_test_stream")
        results = self.test_sink.get_results(False)
        expected = ['(1,1)', '(5,1)', '(15,1)', '(5,2)', '(16,2)']
        self.assert_equals_sorted(expected, results)
        
        

if __name__ == "__main__":
    sys.exit(pytest.main())

Reply via email to