A follow-up question--In the example you provided Shuiqiang, there were no arguments passed to the constructor of the custom sink/source.
What's the best way to pass arguments to the constructor? On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote: > Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. > > On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <acqua....@gmail.com> wrote: > >> Hi Kevin, >> >> Thank you for your questions. Currently, users are not able to defined >> custom source/sinks in Python. This is a greate feature that can unify the >> end to end PyFlink application development in Python and is a large topic >> that we have no plan to support at present. >> >> As you have noticed that `the Python DataStream API has several >> connectors [2] that use Py4J+Java gateways to interoperate with Java >> source/sinks`. These connectors are the extensions of the Python abstract >> class named `SourceFunction` and `SinkFunction`. Thess two classes can >> accept a Java source/sink instance and maintain it to enable the >> interoperation between Python and Java. They can also accept a string of >> the full name of a Java/Scala defined Source/SinkFunction class and create >> the corresponding java instance. Bellow are the definition of these classes: >> >> class JavaFunctionWrapper(object): >> """ >> A wrapper class that maintains a Function implemented in Java. >> """ >> >> def __init__(self, j_function: Union[str, JavaObject]): >> # TODO we should move this part to the get_java_function() to >> perform a lazy load. >> if isinstance(j_function, str): >> j_func_class = get_gateway().jvm.__getattr__(j_function) >> j_function = j_func_class() >> self._j_function = j_function >> >> def get_java_function(self): >> return self._j_function >> >> >> >> class SourceFunction(JavaFunctionWrapper): >> """ >> Base class for all stream data source in Flink. >> """ >> >> def __init__(self, source_func: Union[str, JavaObject]): >> """ >> Constructor of SinkFunction. >> >> :param source_func: The java SourceFunction object. >> """ >> super(SourceFunction, self).__init__(source_func) >> >> >> class SinkFunction(JavaFunctionWrapper): >> """ >> The base class for SinkFunctions. >> """ >> >> def __init__(self, sink_func: Union[str, JavaObject]): >> """ >> Constructor of SinkFunction. >> >> :param sink_func: The java SinkFunction object or the full name of the >> SinkFunction class. >> """ >> super(SinkFunction, self).__init__(sink_func) >> >> Therefore, you are able to defined custom sources/sinks in Scala and >> apply them in Python. Here is the recommended approach for implementation: >> >> class MyBigTableSink(SinkFunction): >> def __init__(self, class_name: str): >> super(MyBigTableSink, self).__init__(class_name) >> >> >> def example(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.add_jars('/the/path/of/your/MyBigTableSink.jar') >> # ... >> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink")) >> env.execute("Application with Custom Sink") >> >> >> if __name__ == '__main__': >> example() >> >> Remember that you must add the jar of the Scala defined SinkFunction by >> calling `env.add_jars()` before adding the SinkFunction. And your custom >> sources/sinks function must be the extension of `SourceFunction` and >> `SinkFunction`. >> >> Any further questions are welcomed! >> >> Best, >> Shuiqiang >> >> >> Kevin Lam <kevin....@shopify.com> 于2021年3月3日周三 上午2:50写道: >> >>> Hello everyone, >>> >>> I have some questions about the Python API that hopefully folks in the >>> Apache Flink community can help with. >>> >>> A little background, I’m interested in using the Python Datastream API >>> because of stakeholders who don’t have a background in Scala/Java, and >>> would prefer Python if possible. Our team is open to maintaining Scala >>> constructs on our end, however we are looking to expose Flink for stateful >>> streaming via a Python API to end-users. >>> >>> Questions: >>> >>> 1/ The docs mention that custom Sources and Sinks cannot be defined in >>> Python, but must be written in Java/Scala [1]. What is the recommended >>> approach for interoperating between custom sinks/sources written in Scala, >>> with the Python API? If nothing is currently supported, is it on the road >>> map? >>> >>> 2/ Also, I’ve noted that the Python DataStream API has several >>> connectors [2] that use Py4J+Java gateways to interoperate with Java >>> source/sinks. Is there a way for users to build their own connectors? What >>> would this process entail? >>> >>> Ideally, we’d like to be able to define custom sources/sinks in Scala >>> and use them in our Python API Flink Applications. For example, defining a >>> BigTable sink in Scala for use in the Python API: >>> >>> >>> [3] >>> >>> Where MyBigTableSink is just somehow importing a Scala defined sink. >>> >>> More generally, we’re interested in learning more about Scala/Python >>> interoperability in Flink, and how we can expose the power of Flink’s Scala >>> APIs to Python. Open to any suggestions, strategies, etc. >>> >>> Looking forward to any thoughts! >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks >>> >>> [2] >>> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py >>> >>> [3] Plaintext paste of code in screenshot, in case of attachment issues: >>> ``` >>> from pyflink.common.typeinfo import Types >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.datastream.connectors import MyBigTableSink >>> >>> def example(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> ... >>> ds.add_sink(MyBigTableSink, ...) >>> env.execute("Application with Custom Sink") >>> >>> if __name__ == '__main__': >>> example() >>> ``` >>> >>