Hi Folks,
I’d like to secure my pyFlink job with tests. I haven’t found any information
writing pytest-tests in the web, so I have read through the test-cases of
pyflink itself and concluded, that I could test my KeyedProcessFunctions
similar as in
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/tests/test_data_stream.py
. Is this a good strategy?
When I tried to subclass ,PyFlinkStreamingTestCase, see code below, I got this
error:
ERROR tests/test_hello_world.py::HelloWorldTests::test_keyed_sum - TypeError:
Could not found the Java class
'org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration.Builder'.
The Java depe...
I haven’t found the jar-File in which MiniClusterResourceConfiguration is
defined. Can anybody point me a direction, where to find this class and how to
add it to the test-framework?
Best,
Kaspar
———————————————————————— test_hello_world.py ————————————————————————
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
from pyflink.common.typeinfo import Types
from pyflink.util.java_utils import get_j_env_configuration
import pytest
import sys
import pickle
from pyflink.datastream.functions import SinkFunction
from pyflink.java_gateway import get_gateway
class DataStreamTestSinkFunction(SinkFunction):
"""
A util class to collect test DataStream transformation results.
"""
def __init__(self):
self.j_data_stream_collect_sink = get_gateway().jvm \
.org.apache.flink.python.util.DataStreamTestCollectSink()
super(DataStreamTestSinkFunction,
self).__init__(sink_func=self.j_data_stream_collect_sink)
def get_results(self, is_python_object: bool = False, stringify: bool =
True):
j_results = self.get_java_function().collectAndClear(is_python_object)
results = list(j_results)
if not is_python_object:
return results
else:
str_results = []
for result in results:
pickled_result = pickle.loads(result)
if stringify:
str_results.append(str(pickled_result))
else:
str_results.append(pickled_result)
return str_results
def clear(self):
if self.j_data_stream_collect_sink is None:
return
self.j_data_stream_collect_sink.clear()
class HelloWorldTests(PyFlinkStreamingTestCase):
def setUp(self) -> None:
super(HelloWorldTests, self).setUp()
config =
get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("pekko.ask.timeout", "20 s")
self.test_sink = DataStreamTestSinkFunction()
def tearDown(self) -> None:
self.test_sink.clear()
def assert_equals_sorted(self, expected, actual):
expected.sort()
actual.sort()
self.assertEqual(expected, actual)
def test_keyed_sum(self):
self.env.set_parallelism(1)
ds = self.env.from_collection(
[(1, 1), (1, 2), (1, 3), (2, 5), (2, 1)],
type_info=Types.ROW_NAMED(["v1", "v2"], [Types.INT(), Types.INT()])
)
ds.key_by(lambda x: x[0]) \
.sum("v2") \
.key_by(lambda x: x[0]) \
.sum(1) \
.map(lambda x: (x[1], x[0]), output_type=Types.TUPLE([Types.INT(),
Types.INT()])) \
.key_by(lambda x: x[1]) \
.sum() \
.add_sink(self.test_sink)
self.env.execute("key_by_sum_test_stream")
results = self.test_sink.get_results(False)
expected = ['(1,1)', '(5,1)', '(15,1)', '(5,2)', '(16,2)']
self.assert_equals_sorted(expected, results)
if __name__ == "__main__":
sys.exit(pytest.main())