Hi Alan I believe that supporting asynchronous UDF is a valuable feature. Currently, there is a similar FLIP[1] available: Can this meet your needs?
[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction Best, Feng On Thu, Sep 21, 2023 at 1:12 PM Alan Sheinberg <asheinb...@confluent.io.invalid> wrote: > Hi Ron, > > Thanks for your response. I've answered some of your questions below. > > I think one solution is to support Mini-Batch Lookup Join by the framework > > layer, do a RPC call by a batch input row, which can improve throughput. > > > Would the idea be to collect a batch and then do a single RPC (or at least > handle a number of rpcs in a single AsyncLookupFunction call)? That is an > interesting idea and could simplify things. For our use cases, > technically, I can write a AsyncLookupFunction and utilize > AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any > issue. My hesitation is that I'm afraid that callers will find it > unintuitive to join with a table where the underlying RPC is not being > modeled in that manner. For example, it could be a text classifier > IS_POSITIVE_SENTIMENT(...) where there's no backing table, just > computation. > > how does the remote function help to solve your problem? > > > The problem is pretty open-ended. There are jobs where you want to join > data with some external data source and inject it into your pipeline, but > others might also be offloading some computation to an external system. > The external system might be owned by a different party, have different > permissions, have different hardware to do a computation (e.g. train a > model), or just block for a while. The most intuitive invocation for this > is just a scalar function in SQL. You just want it to be able to run at a > high throughput. > > Regarding implementing the Remote Function, can you go into more detail > > about your idea, how we should support it, and how users should use it, > > from API design to semantic explanation?and > > > The unimplemented option I gave the most thought to is 3). You can imagine > an AsyncScalarFunction definition and example class like: > > public class AsyncScalarFunction<T> extends UserDefinedFunction { > @Override public final FunctionKind getKind() { > return FunctionKind.ASYNC_SCALAR; > } > @Override public TypeInference getTypeInference(DataTypeFactory > typeFactory) { > return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, > getClass()); > } > } > > class MyScalarFunction extends AsyncScalarFunction<String> { > // Eval method with a future to use as a callback, with arbitrary > additional arguments > public void eval(CompletableFuture<String> result, String input) { > // Example which uses an async http client > AsyncHttpClient httpClient = new AsyncHttpClient(); > // Do the request and then invoke the callback depending on the > outcome. > Future<HttpResponse> responseFuture = httpClient.doPOST(getRequestBody( > input)); > responseFuture.handle((response, throwable) -> { > if (throwable != null) { > result.completeExceptionally(throwable); > } else { > result.complete(response.getBody()); > } > }); > } > ... > } > > Then you can register it in your Flink program as with other UDFs and call > it: > tEnv.createTemporarySystemFunction("MY_FUNCTION", MyScalarFunction.class); > TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM > (SELECT i.input from Inputs i ORDER BY i.timestamp)"); > > I know there are questions about SQL semantics to consider. For example, > does invocation of MY_FUNCTION preserve the order of the subquery above. > To be SQL compliant, I believe it must, so any async request we send out > must be output in order, regardless of when they complete. There are > probably other considerations as well. This for example is implemented as > an option already in AsyncWaitOperator. > > I could do a similar dive into option 2) if that would also be helpful, > though maybe this is a good starting point for conversation. > > Hope that addressed your questions, > Alan > > On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote: > > > Hi, Alan > > > > Thanks for driving this proposal. It sounds interesting. > > Regarding implementing the Remote Function, can you go into more detail > > about your idea, how we should support it, and how users should use it, > > from API design to semantic explanation?and how does the remote function > > help to solve your problem? > > > > I understand that your core pain point is that there are performance > issues > > with too many RPC calls. For the three solutions you have explored. > > Regarding the Lookup Join Cons, > > > > >> *Lookup Joins:* > > Pros: > > - Part of the Flink codebase > > - High throughput > > Cons: > > - Unintuitive syntax > > - Harder to do multiple remote calls per input row > > > > I think one solution is to support Mini-Batch Lookup Join by the > framework > > layer, do a RPC call by a batch input row, which can improve throughput. > > > > Best, > > Ron > > > > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二 07:34写道: > > > > > Hello all, > > > > > > We want to implement a custom function that sends HTTP requests to a > > remote > > > endpoint using Flink SQL. Even though the function will behave like a > > > normal UDF, the runtime would issue calls asynchronously to achieve > high > > > throughput for these remote (potentially high latency) calls. What is > the > > > community's take on implementing greater support for such functions? > Any > > > feedback is appreciated. > > > > > > What we have explored so far: > > > > > > 1. Using a lookup join > > > < > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > >. > > > For example: > > > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp > string, > > > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' = > > > 'remote_call'); > > > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r FOR > > > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r. > > > table_lookup_key; > > > > > > 2. Using a polymorphic table function. Partially supported already for > > > window > > > functions > > > < > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/ > > > >. > > > For example: > > > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d, > Col > > > => DESCRIPTOR("table_lookup_key"))); > > > > > > 3. Using an AsyncScalarFunction. Scalar functions are usually used as > > > below (thus support for an async version of ScalarFunction required): > > > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t; > > > > > > Some pros and cons for each approach: > > > > > > *Lookup Joins:* > > > Pros: > > > - Part of the Flink codebase > > > - High throughput > > > Cons: > > > - Unintuitive syntax > > > - Harder to do multiple remote calls per input row > > > > > > *PTFs:* > > > Pros: > > > - More intuitive syntax > > > Cons: > > > - Need to add more support in Flink. It may exist for specialized > > built-in > > > functions, but not for user defined ones > > > > > > *AsyncScalarFunction:* > > > Pros: > > > - Most intuitive syntax > > > - Easy to do as many calls per row input as desired > > > Cons: > > > - Need to add support in Flink, including a new interface with an async > > > eval method > > > - Out of order results could pose issues with SQL semantics. If we > output > > > in order, the throughput performance may suffer > > > > > > Thanks, > > > Alan > > > > > >