Hey folks, I'm looking for an example for creating a custom source in PyFlink. The one that I found in the tests is a wrapper around a java class:
def test_add_custom_source(self): custom_source = SourceFunction( "org.apache.flink.python.util.MyCustomSourceFunction") ds = self.env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()])) ds.add_sink(self.test_sink) self.env.execute("test add custom source") results = self.test_sink.get_results(False) expected = [ '+I[3, Mike]', '+I[1, Marry]', '+I[4, Ted]', '+I[5, Jack]', '+I[0, Bob]', '+I[2, Henry]'] results.sort() expected.sort() self.assertEqual(expected, results) Is this the only way as of today? Regards, Matyas