Hi Alan

I believe that supporting asynchronous UDF is a valuable
feature. Currently, there is a similar FLIP[1] available:
Can this meet your needs?

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction


Best,
Feng

On Thu, Sep 21, 2023 at 1:12 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:

> Hi Ron,
>
> Thanks for your response.  I've answered some of your questions below.
>
> I think one solution is to support Mini-Batch Lookup Join by the framework
> > layer, do a RPC call by a batch input row, which can improve throughput.
>
>
> Would the idea be to collect a batch and then do a single RPC (or at least
> handle a number of rpcs in a single AsyncLookupFunction call)?  That is an
> interesting idea and could simplify things.  For our use cases,
> technically, I can write a AsyncLookupFunction and utilize
> AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any
> issue. My hesitation is that I'm afraid that callers will find it
> unintuitive to join with a table where the underlying RPC is not being
> modeled in that manner.  For example, it could be a text classifier
> IS_POSITIVE_SENTIMENT(...) where there's no backing table, just
> computation.
>
> how does the remote function help to solve your problem?
>
>
> The problem is pretty open-ended.  There are jobs where you want to join
> data with some external data source and inject it into your pipeline, but
> others might also be offloading some computation to an external system.
> The external system might be owned by a different party, have different
> permissions, have different hardware to do a computation (e.g. train a
> model), or just block for a while.  The most intuitive invocation for this
> is just a scalar function in SQL.  You just want it to be able to run at a
> high throughput.
>
> Regarding implementing the Remote Function, can you go into more detail
> > about your idea, how we should support it, and how users should use it,
> > from API design to semantic explanation?and
>
>
> The unimplemented option I gave the most thought to is 3).  You can imagine
> an AsyncScalarFunction definition and example class like:
>
> public class AsyncScalarFunction<T> extends UserDefinedFunction {
>   @Override public final FunctionKind getKind() {
>     return FunctionKind.ASYNC_SCALAR;
>   }
>   @Override public TypeInference getTypeInference(DataTypeFactory
> typeFactory) {
>    return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory,
> getClass());
>   }
> }
>
> class MyScalarFunction extends AsyncScalarFunction<String> {
>   // Eval method with a future to use as a callback, with arbitrary
> additional arguments
>   public void eval(CompletableFuture<String> result, String input) {
>     // Example which uses an async http client
>     AsyncHttpClient httpClient = new AsyncHttpClient();
>     // Do the request and then invoke the callback depending on the
> outcome.
>     Future<HttpResponse> responseFuture = httpClient.doPOST(getRequestBody(
> input));
>     responseFuture.handle((response, throwable) -> {
>     if (throwable != null) {
>       result.completeExceptionally(throwable);
>     } else {
>       result.complete(response.getBody());
>     }
>    });
>  }
>  ...
> }
>
> Then you can register it in your Flink program as with other UDFs and call
> it:
> tEnv.createTemporarySystemFunction("MY_FUNCTION", MyScalarFunction.class);
> TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM
> (SELECT i.input from Inputs i ORDER BY i.timestamp)");
>
> I know there are questions about SQL semantics to consider.  For example,
> does invocation of MY_FUNCTION preserve the order of the subquery above.
> To be SQL compliant, I believe it must, so any async request we send out
> must be output in order, regardless of when they complete.  There are
> probably other considerations as well.   This for example is implemented as
> an option already in AsyncWaitOperator.
>
> I could do a similar dive into option 2) if that would also be helpful,
> though maybe this is a good starting point for conversation.
>
> Hope that addressed your questions,
> Alan
>
> On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote:
>
> > Hi, Alan
> >
> > Thanks for driving this proposal. It sounds interesting.
> > Regarding implementing the Remote Function, can you go into more detail
> > about your idea, how we should support it, and how users should use it,
> > from API design to semantic explanation?and how does the remote function
> > help to solve your problem?
> >
> > I understand that your core pain point is that there are performance
> issues
> > with too many RPC calls. For the three solutions you have explored.
> > Regarding the Lookup Join Cons,
> >
> > >> *Lookup Joins:*
> > Pros:
> > - Part of the Flink codebase
> > - High throughput
> > Cons:
> > - Unintuitive syntax
> > - Harder to do multiple remote calls per input row
> >
> > I think one solution is to support Mini-Batch Lookup Join by the
> framework
> > layer, do a RPC call by a batch input row, which can improve throughput.
> >
> > Best,
> > Ron
> >
> > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二 07:34写道:
> >
> > > Hello all,
> > >
> > > We want to implement a custom function that sends HTTP requests to a
> > remote
> > > endpoint using Flink SQL. Even though the function will behave like a
> > > normal UDF, the runtime would issue calls asynchronously to achieve
> high
> > > throughput for these remote (potentially high latency) calls. What is
> the
> > > community's take on implementing greater support for such functions?
> Any
> > > feedback is appreciated.
> > >
> > > What we have explored so far:
> > >
> > > 1.  Using a lookup join
> > > <
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > >.
> > > For example:
> > > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp
> string,
> > > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' =
> > > 'remote_call');
> > > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r FOR
> > > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r.
> > > table_lookup_key;
> > >
> > > 2.  Using a polymorphic table function. Partially supported already for
> > > window
> > > functions
> > > <
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/
> > > >.
> > > For example:
> > > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d,
> Col
> > > => DESCRIPTOR("table_lookup_key")));
> > >
> > > 3.  Using an AsyncScalarFunction. Scalar functions are usually used as
> > > below (thus support for an async version of ScalarFunction required):
> > > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t;
> > >
> > > Some pros and cons for each approach:
> > >
> > > *Lookup Joins:*
> > > Pros:
> > > - Part of the Flink codebase
> > > - High throughput
> > > Cons:
> > > - Unintuitive syntax
> > > - Harder to do multiple remote calls per input row
> > >
> > > *PTFs:*
> > > Pros:
> > > - More intuitive syntax
> > > Cons:
> > > - Need to add more support in Flink. It may exist for specialized
> > built-in
> > > functions, but not for user defined ones
> > >
> > > *AsyncScalarFunction:*
> > > Pros:
> > > - Most intuitive syntax
> > > - Easy to do as many calls per row input as desired
> > > Cons:
> > > - Need to add support in Flink, including a new interface with an async
> > > eval method
> > > - Out of order results could pose issues with SQL semantics. If we
> output
> > > in order, the throughput performance may suffer
> > >
> > > Thanks,
> > > Alan
> > >
> >
>

Reply via email to