Dian Fu created FLINK-38615:
-------------------------------

             Summary: Optimize the Python AsyncFunction interface to make it 
more Pythonic
                 Key: FLINK-38615
                 URL: https://issues.apache.org/jira/browse/FLINK-38615
             Project: Flink
          Issue Type: Sub-task
          Components: API / Python
            Reporter: Dian Fu
            Assignee: Dian Fu


The current design is as following:
{code}
class AsyncFunction(Function, Generic[IN, OUT]):
    """
    A function to trigger Async I/O operation.

    For each #async_invoke, an async io operation can be triggered, and once it 
has been done, the
    result can be collected by calling :func:`~ResultFuture.complete`. For each 
async operation, its
    context is stored in the operator immediately after invoking #async_invoke, 
avoiding blocking
    for each stream input as long as the internal buffer is not full.

    :class:`~ResultFuture` can be passed into callbacks or futures to collect 
the result data. An
    error can also be propagated to the async IO operator by
    :func:`~ResultFuture.complete_exceptionally`.
    """

    @abstractmethod
    async def async_invoke(self, value: IN, result_future: ResultFuture[OUT]):
        pass

    def timeout(self, value: IN, result_future: ResultFuture[OUT]):
        pass
{code}

We can refactor it as following to make it more easy to use for Python users:
{code}
class AsyncFunction(Function, Generic[IN, OUT]):

    @abstractmethod
    async def async_invoke(self, value: IN) -> List[OUT]:
        pass

    def timeout(self, value: IN) -> List[OUT]:
        pass
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to