Hello everyone,

I have some questions about the Python API that hopefully folks in the
Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API
because of stakeholders who don’t have a background in Scala/Java, and
would prefer Python if possible. Our team is open to maintaining Scala
constructs on our end, however we are looking to expose Flink for stateful
streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in
Python, but must be written in Java/Scala [1]. What is the recommended
approach for interoperating between custom sinks/sources written in Scala,
with the Python API? If nothing is currently supported, is it on the road
map?

2/ Also, I’ve noted that the Python DataStream API has several connectors
[2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
there a way for users to build their own connectors? What would this
process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and
use them in our Python API Flink Applications. For example, defining a
BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python
interoperability in Flink, and how we can expose the power of Flink’s Scala
APIs to Python. Open to any suggestions, strategies, etc.

Looking forward to any thoughts!


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2]
https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    ...
    ds.add_sink(MyBigTableSink, ...)
    env.execute("Application with Custom Sink")

if __name__ == '__main__':
    example()
```

Reply via email to