Hi Gordon and Till: Thanks for pointing me to the new version! The code I'm using is for a research project so it's not on any production deadline. However I do like to know any upcoming updates so there won't be any duplicated works. Couple of questions I have now: 1. Does 3.0 support context.send(customized) from DataStream interface? If I implement a statefun application in DataStream API, is there an example on how to specify customized user types for Datastream application? 2. I noticed that the 3.0 version supports async operation as specified in this <https://github.com/apache/flink-statefun-playground/blob/8b434fcfadd78b7a83b0c09b5610967776e1a94d/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part5/asyncops/AsyncOpsShowcaseFn.java#L65>. In the older version I noticed that the async operation is done by registerAsyncOperation which ensures the function continuation is still executed by the Flink worker. It seems in the new version I can simply use Java's standard async API. Does Statefun still ensure everything within the whenComplete.() scope is executed as a Statefun message? (Let's say I uses a separate thread pool as a storage client).
Thanks for the help! Le On Tue, Mar 30, 2021 at 8:39 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Le, > > Thanks for reaching out with this question! It's actually a good segue to > allow me to introduce you to StateFun 3.0.0 :) > > StateFun 3.0+ comes with a new type system that would eliminate this > hassle. You can take a sneak peek here [1]. > This is part 1 of a series of tutorials on fundamentals on the upcoming > new Java SDK (you can find tutorials for other languages there as well), > and it guides you through a bit on the new type system. > > For your specific case, what you would do is implement a `Type` for your > Tuple3 messages. The `Type` contains information including a typename to > identify the data type, and a serializer for de-/serializing the data. > This `Type` can then be used when creating messages to be sent to other > functions and egresses, or used as the type specification for persisted > state values. > > If you're not in production usage already, I would highly suggest waiting > a bit for StateFun 3.0.0 as it is just around the corner with an ongoing > release candidate vote [2] and is expected to be available within 1-2 weeks. > > Let me know if this helps! > > Cheers, > Gordon > > [1] > https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java > [2] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html > > On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Le, >> >> I am pulling in Gordon who might be able to help you with your question. >> >> Looking at the interface Context, it looks that you cannot easily specify >> a TypeHint for the message you want to send. Hence, I guess that you >> explicitly need to register these types. >> >> Cheers, >> Till >> >> On Tue, Mar 30, 2021 at 8:20 AM Le Xu <sharonx...@gmail.com> wrote: >> >>> Hello! >>> >>> I'm trying to figure out whether Flink Statefun supports sending object >>> with class that has generic parameter types (and potentially nested types). >>> For example, I send a message that looks like this: >>> >>> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject, >>> listOfLongObject, Long)); >>> >>> And obviously I'm getting complaints like this: >>> >>> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract >>> TypeInformation from Class alone, because generic parameters are missing. >>> Please use TypeInformation.of(TypeHint) instead, or another equivalent >>> method in the API that accepts a TypeHint instead of a Class. For example >>> for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'. >>> at >>> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214) >>> at >>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60) >>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127) >>> at >>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49) >>> at >>> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100) >>> at >>> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54) >>> at >>> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39) >>> at >>> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42) >>> at >>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74) >>> at >>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59) >>> at >>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75) >>> at >>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52) >>> at >>> org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36) >>> at >>> org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92) >>> at org.apache.flink.statefun.sdk.Context.send(Context.java:88) >>> at >>> benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292) >>> at >>> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) >>> >>> >>> Is there any API function that statefun support for parameterized class >>> like this or does the user function need to handle the serialization >>> process -- or is there anyway to quickly modify statefun message interface >>> to support this functionality. >>> >>> Thanks! >>> >>> Le >>> >>> >>> >>> >>> >>>