Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.

On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi Kevin,
>
> Thank you for your questions. Currently, users are not able to defined
> custom source/sinks in Python. This is a greate feature that can unify the
> end to end PyFlink application development in Python and is a large topic
> that we have no plan to support at present.
>
> As you have noticed that `the Python DataStream API has several connectors
> [2] that use Py4J+Java gateways to interoperate with Java source/sinks`.
> These connectors are the extensions of the Python abstract class named
> `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java
> source/sink instance and maintain it to enable the interoperation between
> Python and Java.  They can also accept a string of the full name of a
> Java/Scala defined Source/SinkFunction class and create the corresponding
> java instance. Bellow are the definition of these classes:
>
> class JavaFunctionWrapper(object):
>     """
>     A wrapper class that maintains a Function implemented in Java.
>     """
>
>     def __init__(self, j_function: Union[str, JavaObject]):
>         # TODO we should move this part to the get_java_function() to perform 
> a lazy load.
>         if isinstance(j_function, str):
>             j_func_class = get_gateway().jvm.__getattr__(j_function)
>             j_function = j_func_class()
>         self._j_function = j_function
>
>     def get_java_function(self):
>         return self._j_function
>
>
>
> class SourceFunction(JavaFunctionWrapper):
> """
> Base class for all stream data source in Flink.
> """
>
> def __init__(self, source_func: Union[str, JavaObject]):
> """
> Constructor of SinkFunction.
>
> :param source_func: The java SourceFunction object.
> """
> super(SourceFunction, self).__init__(source_func)
>
>
> class SinkFunction(JavaFunctionWrapper):
> """
> The base class for SinkFunctions.
> """
>
> def __init__(self, sink_func: Union[str, JavaObject]):
> """
> Constructor of SinkFunction.
>
> :param sink_func: The java SinkFunction object or the full name of the
> SinkFunction class.
> """
> super(SinkFunction, self).__init__(sink_func)
>
> Therefore, you are able to defined custom sources/sinks in Scala and apply
> them in Python. Here is the recommended approach for implementation:
>
> class MyBigTableSink(SinkFunction):
>     def __init__(self, class_name: str):
>         super(MyBigTableSink, self).__init__(class_name)
>
>
> def example():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.add_jars('/the/path/of/your/MyBigTableSink.jar')
>     # ...
>     ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
>     env.execute("Application with Custom Sink")
>
>
> if __name__ == '__main__':
>     example()
>
> Remember that you must add the jar of the Scala defined SinkFunction by
> calling `env.add_jars()` before adding the SinkFunction. And your custom
> sources/sinks function must be the extension of `SourceFunction` and
> `SinkFunction`.
>
> Any further questions are welcomed!
>
> Best,
> Shuiqiang
>
>
> Kevin Lam <kevin....@shopify.com> 于2021年3月3日周三 上午2:50写道:
>
>> Hello everyone,
>>
>> I have some questions about the Python API that hopefully folks in the
>> Apache Flink community can help with.
>>
>> A little background, I’m interested in using the Python Datastream API
>> because of stakeholders who don’t have a background in Scala/Java, and
>> would prefer Python if possible. Our team is open to maintaining Scala
>> constructs on our end, however we are looking to expose Flink for stateful
>> streaming via a Python API to end-users.
>>
>> Questions:
>>
>> 1/ The docs mention that custom Sources and Sinks cannot be defined in
>> Python, but must be written in Java/Scala [1]. What is the recommended
>> approach for interoperating between custom sinks/sources written in Scala,
>> with the Python API? If nothing is currently supported, is it on the road
>> map?
>>
>> 2/ Also, I’ve noted that the Python DataStream API has several connectors
>> [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
>> there a way for users to build their own connectors? What would this
>> process entail?
>>
>> Ideally, we’d like to be able to define custom sources/sinks in Scala and
>> use them in our Python API Flink Applications. For example, defining a
>> BigTable sink in Scala for use in the Python API:
>>
>>
>> [3]
>>
>> Where MyBigTableSink is just somehow importing a Scala defined sink.
>>
>> More generally, we’re interested in learning more about Scala/Python
>> interoperability in Flink, and how we can expose the power of Flink’s Scala
>> APIs to Python. Open to any suggestions, strategies, etc.
>>
>> Looking forward to any thoughts!
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks
>>
>> [2]
>> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py
>>
>> [3] Plaintext paste of code in screenshot, in case of attachment issues:
>> ```
>> from pyflink.common.typeinfo import Types
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.datastream.connectors import MyBigTableSink
>>
>> def example():
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     ...
>>     ds.add_sink(MyBigTableSink, ...)
>>     env.execute("Application with Custom Sink")
>>
>> if __name__ == '__main__':
>>     example()
>> ```
>>
>

Reply via email to