Josh, If you have some ideas about improving the Transformer / Processor APIs as well as supporting sessioned windows. Please do feel free to create a JIRA and start discussions there. Also, PRs are more than welcomed :)
Guozhang On Fri, Mar 25, 2016 at 10:50 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Josh, > > We are aware that the Transformer / Processor can be improved, for example > the punctuate() function should be able to return the same typed value R > for Transformer. > > As for now, in your case you can return a sentinel from transform, and add > a "filter" right after it removing sentinel values. > > Guozhang > > > On Wed, Mar 23, 2016 at 7:02 PM, josh gruenberg <jos...@gmail.com> wrote: > >> Thank you, Guozhang! In my exploration, I did overlook the "transform" >> method; this looks promising. >> >> I could still use a little more help: I'm confused because for this >> sessionization use-case, an invocation of the 'transform' method usually >> suggests that a session is still active, so I'll have nothing to emit from >> 'transform'. Instead, I'm guessing I'll need to produce my results from >> the >> 'punctuate' callback. So my questions are: >> >> 1. what should I return from 'transform' to indicate that I have no output >> at this time? From my reading of 'KStreamTransformProcessor.process', it >> appears that "null" won't fly. Should I return a dummy KeyValue, and then >> filter that out downstream? Seems a little cumbersome, but perhaps not >> terrible as an interim solution... Is there a better way? >> 2. To emit completed aggregations in response to 'punctuate', can I just >> send them via 'context.forward'? (I'll note that this doesn't appear to >> enforce any type-safety, which could lead to maintainability issues.) >> >> Finally, I'll add that this pattern feels like it's abusing the >> Transformer >> SPI. The interface assumes that transformation is always 1:1, which is >> artificially limiting. I imagine some sort of generalization of this part >> of the system could improve usability. For example, both 'transform' and >> 'punctuate' might be reframed as void methods that receive a type-safe >> interface for 'context.forward'. (I have this small change drafted up >> within the kafka trunk sources, and could submit a PR if the maintainers >> are interested?) >> >> Thanks, >> -josh >> >> On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Hello Josh, >> > >> > As of now Kafka Streams does not yet support session windows as in the >> > Dataflow model, though we do have near term plans to support it. >> > >> > As for now you can still work around it with the Processor, by calling >> > "KStream.transform()" function, which can still return you a stream >> object. >> > In your customized "Transofmer" implementation, you can attach a state >> > store of your own and access it in the "transform" function, and only >> > return the results, for example, when one session has ended. >> > >> > As a concrete example, Confluent has some internal tools that uses Kafka >> > Streams already for some online operations, where a sessioned window >> > processor are needed as well. We use the "transform" function in the >> > Streams DSL (i.e. "KStreamBuilder") in the following sketch: >> > >> > -------------- >> > >> > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/, >> > "store-name"); >> > >> > stream1 = builder.stream("source-topic"); >> > >> > stream2.transform(MyTransformerFunc, "store-name"); >> > >> > -------------- >> > >> > then in MyTransformerFunc: >> > >> > public void init(ProcessorContext context) { >> > this.kvStore = context.getStateStore("store-name"); >> > >> > >> > // now you can access this store in the transform function. >> > } >> > >> > -------------- >> > >> > >> > Hope this helps. >> > >> > Guozhang >> > >> > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jos...@gmail.com> >> wrote: >> > >> > > Hello there, >> > > >> > > I've been experimenting with the Kafka Streams preview, and I'm >> excited >> > > about its features and capabilities! My team is enthusiastic about the >> > > lightweight operational profile, and the support for local state is >> very >> > > compelling. >> > > >> > > However, I'm having trouble designing a solution with KStreams to >> > satisfy a >> > > particular use-case: we want to "Sessionize" a stream of events, by >> > > gathering together inputs that share a common identifier and occur >> > without >> > > a configurable interruption (gap) in event-time. >> > > >> > > This is achievable with other streaming frameworks (eg, using >> > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState >> with >> > > its "timeout" capability), but I don't see how to approach it with the >> > > current Kafka Streams API. >> > > >> > > I've investigated using the aggregateWithKey function, but this >> doesn't >> > > appear to support data-driven windowing. I've also considered using a >> > > custom Processor to perform the aggregation, but don't see how to >> take an >> > > output-stream from a Processor and continue to work with it. This >> area of >> > > the system is undocumented, so I'm not sure how to proceed. >> > > >> > > Am I missing something? Do you have any suggestions? >> > > >> > > -josh >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > > -- > -- Guozhang > -- -- Guozhang