Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <acqua....@gmail.com> wrote:
> Hi Kevin, > > Thank you for your questions. Currently, users are not able to defined > custom source/sinks in Python. This is a greate feature that can unify the > end to end PyFlink application development in Python and is a large topic > that we have no plan to support at present. > > As you have noticed that `the Python DataStream API has several connectors > [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. > These connectors are the extensions of the Python abstract class named > `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java > source/sink instance and maintain it to enable the interoperation between > Python and Java. They can also accept a string of the full name of a > Java/Scala defined Source/SinkFunction class and create the corresponding > java instance. Bellow are the definition of these classes: > > class JavaFunctionWrapper(object): > """ > A wrapper class that maintains a Function implemented in Java. > """ > > def __init__(self, j_function: Union[str, JavaObject]): > # TODO we should move this part to the get_java_function() to perform > a lazy load. > if isinstance(j_function, str): > j_func_class = get_gateway().jvm.__getattr__(j_function) > j_function = j_func_class() > self._j_function = j_function > > def get_java_function(self): > return self._j_function > > > > class SourceFunction(JavaFunctionWrapper): > """ > Base class for all stream data source in Flink. > """ > > def __init__(self, source_func: Union[str, JavaObject]): > """ > Constructor of SinkFunction. > > :param source_func: The java SourceFunction object. > """ > super(SourceFunction, self).__init__(source_func) > > > class SinkFunction(JavaFunctionWrapper): > """ > The base class for SinkFunctions. > """ > > def __init__(self, sink_func: Union[str, JavaObject]): > """ > Constructor of SinkFunction. > > :param sink_func: The java SinkFunction object or the full name of the > SinkFunction class. > """ > super(SinkFunction, self).__init__(sink_func) > > Therefore, you are able to defined custom sources/sinks in Scala and apply > them in Python. Here is the recommended approach for implementation: > > class MyBigTableSink(SinkFunction): > def __init__(self, class_name: str): > super(MyBigTableSink, self).__init__(class_name) > > > def example(): > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars('/the/path/of/your/MyBigTableSink.jar') > # ... > ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink")) > env.execute("Application with Custom Sink") > > > if __name__ == '__main__': > example() > > Remember that you must add the jar of the Scala defined SinkFunction by > calling `env.add_jars()` before adding the SinkFunction. And your custom > sources/sinks function must be the extension of `SourceFunction` and > `SinkFunction`. > > Any further questions are welcomed! > > Best, > Shuiqiang > > > Kevin Lam <kevin....@shopify.com> 于2021年3月3日周三 上午2:50写道: > >> Hello everyone, >> >> I have some questions about the Python API that hopefully folks in the >> Apache Flink community can help with. >> >> A little background, I’m interested in using the Python Datastream API >> because of stakeholders who don’t have a background in Scala/Java, and >> would prefer Python if possible. Our team is open to maintaining Scala >> constructs on our end, however we are looking to expose Flink for stateful >> streaming via a Python API to end-users. >> >> Questions: >> >> 1/ The docs mention that custom Sources and Sinks cannot be defined in >> Python, but must be written in Java/Scala [1]. What is the recommended >> approach for interoperating between custom sinks/sources written in Scala, >> with the Python API? If nothing is currently supported, is it on the road >> map? >> >> 2/ Also, I’ve noted that the Python DataStream API has several connectors >> [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is >> there a way for users to build their own connectors? What would this >> process entail? >> >> Ideally, we’d like to be able to define custom sources/sinks in Scala and >> use them in our Python API Flink Applications. For example, defining a >> BigTable sink in Scala for use in the Python API: >> >> >> [3] >> >> Where MyBigTableSink is just somehow importing a Scala defined sink. >> >> More generally, we’re interested in learning more about Scala/Python >> interoperability in Flink, and how we can expose the power of Flink’s Scala >> APIs to Python. Open to any suggestions, strategies, etc. >> >> Looking forward to any thoughts! >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks >> >> [2] >> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py >> >> [3] Plaintext paste of code in screenshot, in case of attachment issues: >> ``` >> from pyflink.common.typeinfo import Types >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.datastream.connectors import MyBigTableSink >> >> def example(): >> env = StreamExecutionEnvironment.get_execution_environment() >> ... >> ds.add_sink(MyBigTableSink, ...) >> env.execute("Application with Custom Sink") >> >> if __name__ == '__main__': >> example() >> ``` >> >