Hi Kevin, Sorry for the late reply.
Actually, you are able to pass arguments to the constructor of the Java object when instancing in Python. Basic data types (char/boolean/int/long/float/double/string, etc) can be directory passed while complex types (array/list/map/POJO, etc) must be converted to java objects before passing. Please refer to https://www.py4j.org/py4j_java_collections.html for more information. Best, Shuiqiang Kevin Lam <kevin....@shopify.com> 于2021年3月11日周四 上午4:28写道: > A follow-up question--In the example you provided Shuiqiang, there were no > arguments passed to the constructor of the custom sink/source. > > What's the best way to pass arguments to the constructor? > > On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote: > >> Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. >> >> On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <acqua....@gmail.com> >> wrote: >> >>> Hi Kevin, >>> >>> Thank you for your questions. Currently, users are not able to defined >>> custom source/sinks in Python. This is a greate feature that can unify the >>> end to end PyFlink application development in Python and is a large topic >>> that we have no plan to support at present. >>> >>> As you have noticed that `the Python DataStream API has several >>> connectors [2] that use Py4J+Java gateways to interoperate with Java >>> source/sinks`. These connectors are the extensions of the Python abstract >>> class named `SourceFunction` and `SinkFunction`. Thess two classes can >>> accept a Java source/sink instance and maintain it to enable the >>> interoperation between Python and Java. They can also accept a string of >>> the full name of a Java/Scala defined Source/SinkFunction class and create >>> the corresponding java instance. Bellow are the definition of these classes: >>> >>> class JavaFunctionWrapper(object): >>> """ >>> A wrapper class that maintains a Function implemented in Java. >>> """ >>> >>> def __init__(self, j_function: Union[str, JavaObject]): >>> # TODO we should move this part to the get_java_function() to >>> perform a lazy load. >>> if isinstance(j_function, str): >>> j_func_class = get_gateway().jvm.__getattr__(j_function) >>> j_function = j_func_class() >>> self._j_function = j_function >>> >>> def get_java_function(self): >>> return self._j_function >>> >>> >>> >>> class SourceFunction(JavaFunctionWrapper): >>> """ >>> Base class for all stream data source in Flink. >>> """ >>> >>> def __init__(self, source_func: Union[str, JavaObject]): >>> """ >>> Constructor of SinkFunction. >>> >>> :param source_func: The java SourceFunction object. >>> """ >>> super(SourceFunction, self).__init__(source_func) >>> >>> >>> class SinkFunction(JavaFunctionWrapper): >>> """ >>> The base class for SinkFunctions. >>> """ >>> >>> def __init__(self, sink_func: Union[str, JavaObject]): >>> """ >>> Constructor of SinkFunction. >>> >>> :param sink_func: The java SinkFunction object or the full name of the >>> SinkFunction class. >>> """ >>> super(SinkFunction, self).__init__(sink_func) >>> >>> Therefore, you are able to defined custom sources/sinks in Scala and >>> apply them in Python. Here is the recommended approach for implementation: >>> >>> class MyBigTableSink(SinkFunction): >>> def __init__(self, class_name: str): >>> super(MyBigTableSink, self).__init__(class_name) >>> >>> >>> def example(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.add_jars('/the/path/of/your/MyBigTableSink.jar') >>> # ... >>> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink")) >>> env.execute("Application with Custom Sink") >>> >>> >>> if __name__ == '__main__': >>> example() >>> >>> Remember that you must add the jar of the Scala defined SinkFunction by >>> calling `env.add_jars()` before adding the SinkFunction. And your custom >>> sources/sinks function must be the extension of `SourceFunction` and >>> `SinkFunction`. >>> >>> Any further questions are welcomed! >>> >>> Best, >>> Shuiqiang >>> >>> >>> Kevin Lam <kevin....@shopify.com> 于2021年3月3日周三 上午2:50写道: >>> >>>> Hello everyone, >>>> >>>> I have some questions about the Python API that hopefully folks in the >>>> Apache Flink community can help with. >>>> >>>> A little background, I’m interested in using the Python Datastream API >>>> because of stakeholders who don’t have a background in Scala/Java, and >>>> would prefer Python if possible. Our team is open to maintaining Scala >>>> constructs on our end, however we are looking to expose Flink for stateful >>>> streaming via a Python API to end-users. >>>> >>>> Questions: >>>> >>>> 1/ The docs mention that custom Sources and Sinks cannot be defined in >>>> Python, but must be written in Java/Scala [1]. What is the recommended >>>> approach for interoperating between custom sinks/sources written in Scala, >>>> with the Python API? If nothing is currently supported, is it on the road >>>> map? >>>> >>>> 2/ Also, I’ve noted that the Python DataStream API has several >>>> connectors [2] that use Py4J+Java gateways to interoperate with Java >>>> source/sinks. Is there a way for users to build their own connectors? What >>>> would this process entail? >>>> >>>> Ideally, we’d like to be able to define custom sources/sinks in Scala >>>> and use them in our Python API Flink Applications. For example, defining a >>>> BigTable sink in Scala for use in the Python API: >>>> >>>> >>>> [3] >>>> >>>> Where MyBigTableSink is just somehow importing a Scala defined sink. >>>> >>>> More generally, we’re interested in learning more about Scala/Python >>>> interoperability in Flink, and how we can expose the power of Flink’s Scala >>>> APIs to Python. Open to any suggestions, strategies, etc. >>>> >>>> Looking forward to any thoughts! >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks >>>> >>>> [2] >>>> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py >>>> >>>> [3] Plaintext paste of code in screenshot, in case of attachment issues: >>>> ``` >>>> from pyflink.common.typeinfo import Types >>>> from pyflink.datastream import StreamExecutionEnvironment >>>> from pyflink.datastream.connectors import MyBigTableSink >>>> >>>> def example(): >>>> env = StreamExecutionEnvironment.get_execution_environment() >>>> ... >>>> ds.add_sink(MyBigTableSink, ...) >>>> env.execute("Application with Custom Sink") >>>> >>>> if __name__ == '__main__': >>>> example() >>>> ``` >>>> >>>