Hi Igal, Yes! that's exactly what I was thinking. The batching will naturally happen as the model applies backpressure. We're using pandas and it's pretty costly to create a dataframe and everything to process a single event. Internally the SDK has access to the batch and is calling my function, which creates a dataframe for each individual event. This causes a ton of overhead since we basically get destroyed by the constant factors around creating and operating on dataframes.
Knowing how the SDK works, it seems like it'd be easy to do something like your example and maybe have a different decorator for "batch functions" where the SDK just passes in everything at once. Also just out of curiosity are there plans to build out more introspection into statefun's flink state? I was thinking it would be super useful to add either Queryable state or have some control topic that statefun listens to that allows me to send events to introspect or modify flink state. For example like: // control topic request {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"} // response {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] } Or {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value: "base64bytes"} {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"} Also having opentracing integration where Statefun passes b3 headers with each request so we can trace a message's route through statefun would be _super_ useful. We'd literally be able to see the entire path of an event from ingress to egress and time spent in each function. Not sure if there are any plans around that, but since we're live with a statefun project now, it's possible we could contribute some if you guys are open to it. Thanks, Tim On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <i...@ververica.com> wrote: > Hi Tim! > > Indeed the StateFun SDK / StateFun runtime, has an internal concept of > batching, that kicks in the presence of a slow > /congested remote function. Keep in mind that under normal circumstances > batching does not happen (effectively a batch of size 1 will be sent). [1] > This batch is not currently exposed via the SDKs (both Java and Python) as > it is an implementation detail (see [2]). > > The way I understand your message (please correct me if I'm wrong): is > that evaluation of the ML model is costly, and it would benefit from some > sort of batching (like pandas do i assume ?) > instead of being applied for every event individually. > If this is the case, perhaps exposing this batch can be a useful feature > to add. > > For example: > > @functions.bind_tim(..) > def ml(context, messages: typing.List[Message]): > ... > > > > Let me know what you think, > Igal. > > > > [1] > https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80 > [2] > https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219 > > On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <tdbga...@gmail.com> wrote: > >> Hi everyone, >> >> Is there a good way to access the batch of leads that Statefun sends to >> the Python SDK rather than processing events one by one? We're trying to >> run our data scientist's machine learning model through the SDK, but the >> code is very slow when we do single events and we don't get many of the >> benefits of Pandas/etc. >> >> Thanks, >> >> Tim >> >