Hey folks,

I'm looking for an example for creating a custom source in PyFlink. The one
that I found in the tests is a wrapper around a java class:

def test_add_custom_source(self):
custom_source = SourceFunction(
"org.apache.flink.python.util.MyCustomSourceFunction")
ds = self.env.add_source(custom_source, type_info=Types.ROW([Types.INT(),
Types.STRING()]))
ds.add_sink(self.test_sink)
self.env.execute("test add custom source")
results = self.test_sink.get_results(False)
expected = [
'+I[3, Mike]',
'+I[1, Marry]',
'+I[4, Ted]',
'+I[5, Jack]',
'+I[0, Bob]',
'+I[2, Henry]']
results.sort()
expected.sort()
self.assertEqual(expected, results)

Is this the only way as of today?

Regards,
Matyas

Reply via email to