Hi Kevin, Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present.
As you have noticed that `the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. These connectors are the extensions of the Python abstract class named `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java source/sink instance and maintain it to enable the interoperation between Python and Java. They can also accept a string of the full name of a Java/Scala defined Source/SinkFunction class and create the corresponding java instance. Bellow are the definition of these classes: class JavaFunctionWrapper(object): """ A wrapper class that maintains a Function implemented in Java. """ def __init__(self, j_function: Union[str, JavaObject]): # TODO we should move this part to the get_java_function() to perform a lazy load. if isinstance(j_function, str): j_func_class = get_gateway().jvm.__getattr__(j_function) j_function = j_func_class() self._j_function = j_function def get_java_function(self): return self._j_function class SourceFunction(JavaFunctionWrapper): """ Base class for all stream data source in Flink. """ def __init__(self, source_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param source_func: The java SourceFunction object. """ super(SourceFunction, self).__init__(source_func) class SinkFunction(JavaFunctionWrapper): """ The base class for SinkFunctions. """ def __init__(self, sink_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param sink_func: The java SinkFunction object or the full name of the SinkFunction class. """ super(SinkFunction, self).__init__(sink_func) Therefore, you are able to defined custom sources/sinks in Scala and apply them in Python. Here is the recommended approach for implementation: class MyBigTableSink(SinkFunction): def __init__(self, class_name: str): super(MyBigTableSink, self).__init__(class_name) def example(): env = StreamExecutionEnvironment.get_execution_environment() env.add_jars('/the/path/of/your/MyBigTableSink.jar') # ... ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink")) env.execute("Application with Custom Sink") if __name__ == '__main__': example() Remember that you must add the jar of the Scala defined SinkFunction by calling `env.add_jars()` before adding the SinkFunction. And your custom sources/sinks function must be the extension of `SourceFunction` and `SinkFunction`. Any further questions are welcomed! Best, Shuiqiang Kevin Lam <kevin....@shopify.com> 于2021年3月3日周三 上午2:50写道: > Hello everyone, > > I have some questions about the Python API that hopefully folks in the > Apache Flink community can help with. > > A little background, I’m interested in using the Python Datastream API > because of stakeholders who don’t have a background in Scala/Java, and > would prefer Python if possible. Our team is open to maintaining Scala > constructs on our end, however we are looking to expose Flink for stateful > streaming via a Python API to end-users. > > Questions: > > 1/ The docs mention that custom Sources and Sinks cannot be defined in > Python, but must be written in Java/Scala [1]. What is the recommended > approach for interoperating between custom sinks/sources written in Scala, > with the Python API? If nothing is currently supported, is it on the road > map? > > 2/ Also, I’ve noted that the Python DataStream API has several connectors > [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is > there a way for users to build their own connectors? What would this > process entail? > > Ideally, we’d like to be able to define custom sources/sinks in Scala and > use them in our Python API Flink Applications. For example, defining a > BigTable sink in Scala for use in the Python API: > > > [3] > > Where MyBigTableSink is just somehow importing a Scala defined sink. > > More generally, we’re interested in learning more about Scala/Python > interoperability in Flink, and how we can expose the power of Flink’s Scala > APIs to Python. Open to any suggestions, strategies, etc. > > Looking forward to any thoughts! > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks > > [2] > https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py > > [3] Plaintext paste of code in screenshot, in case of attachment issues: > ``` > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.connectors import MyBigTableSink > > def example(): > env = StreamExecutionEnvironment.get_execution_environment() > ... > ds.add_sink(MyBigTableSink, ...) > env.execute("Application with Custom Sink") > > if __name__ == '__main__': > example() > ``` >