Josh,

If you have some ideas about improving the Transformer / Processor APIs as
well as supporting sessioned windows. Please do feel free to create a JIRA
and start discussions there. Also, PRs are more than welcomed :)

Guozhang

On Fri, Mar 25, 2016 at 10:50 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Josh,
>
> We are aware that the Transformer / Processor can be improved, for example
> the punctuate() function should be able to return the same typed value R
> for Transformer.
>
> As for now, in your case you can return a sentinel from transform, and add
> a "filter" right after it removing sentinel values.
>
> Guozhang
>
>
> On Wed, Mar 23, 2016 at 7:02 PM, josh gruenberg <jos...@gmail.com> wrote:
>
>> Thank you, Guozhang! In my exploration, I did overlook the "transform"
>> method; this looks promising.
>>
>> I could still use a little more help: I'm confused because for this
>> sessionization use-case, an invocation of the 'transform' method usually
>> suggests that a session is still active, so I'll have nothing to emit from
>> 'transform'. Instead, I'm guessing I'll need to produce my results from
>> the
>> 'punctuate' callback. So my questions are:
>>
>> 1. what should I return from 'transform' to indicate that I have no output
>> at this time? From my reading of 'KStreamTransformProcessor.process', it
>> appears that "null" won't fly. Should I return a dummy KeyValue, and then
>> filter that out downstream? Seems a little cumbersome, but perhaps not
>> terrible as an interim solution... Is there a better way?
>> 2. To emit completed aggregations in response to 'punctuate', can I just
>> send them via 'context.forward'? (I'll note that this doesn't appear to
>> enforce any type-safety, which could lead to maintainability issues.)
>>
>> Finally, I'll add that this pattern feels like it's abusing the
>> Transformer
>> SPI. The interface assumes that transformation is always 1:1, which is
>> artificially limiting. I imagine some sort of generalization of this part
>> of the system could improve usability. For example, both 'transform' and
>> 'punctuate' might be reframed as void methods that receive a type-safe
>> interface for 'context.forward'. (I have this small change drafted up
>> within the kafka trunk sources, and could submit a PR if the maintainers
>> are interested?)
>>
>> Thanks,
>> -josh
>>
>> On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Hello Josh,
>> >
>> > As of now Kafka Streams does not yet support session windows as in the
>> > Dataflow model, though we do have near term plans to support it.
>> >
>> > As for now you can still work around it with the Processor, by calling
>> > "KStream.transform()" function, which can still return you a stream
>> object.
>> > In your customized "Transofmer" implementation, you can attach a state
>> > store of your own and access it in the "transform" function, and only
>> > return the results, for example, when one session has ended.
>> >
>> > As a concrete example, Confluent has some internal tools that uses Kafka
>> > Streams already for some online operations, where a sessioned window
>> > processor are needed as well. We use the "transform" function in the
>> > Streams DSL (i.e. "KStreamBuilder") in the following sketch:
>> >
>> > --------------
>> >
>> > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
>> > "store-name");
>> >
>> > stream1 = builder.stream("source-topic");
>> >
>> > stream2.transform(MyTransformerFunc, "store-name");
>> >
>> > --------------
>> >
>> > then in MyTransformerFunc:
>> >
>> > public void init(ProcessorContext context) {
>> >           this.kvStore = context.getStateStore("store-name");
>> >
>> >
>> >            // now you can access this store in the transform function.
>> > }
>> >
>> > --------------
>> >
>> >
>> > Hope this helps.
>> >
>> > Guozhang
>> >
>> > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jos...@gmail.com>
>> wrote:
>> >
>> > > Hello there,
>> > >
>> > > I've been experimenting with the Kafka Streams preview, and I'm
>> excited
>> > > about its features and capabilities! My team is enthusiastic about the
>> > > lightweight operational profile, and the support for local state is
>> very
>> > > compelling.
>> > >
>> > > However, I'm having trouble designing a solution with KStreams to
>> > satisfy a
>> > > particular use-case: we want to "Sessionize" a stream of events, by
>> > > gathering together inputs that share a common identifier and occur
>> > without
>> > > a configurable interruption (gap) in event-time.
>> > >
>> > > This is achievable with other streaming frameworks (eg, using
>> > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState
>> with
>> > > its "timeout" capability), but I don't see how to approach it with the
>> > > current Kafka Streams API.
>> > >
>> > > I've investigated using the aggregateWithKey function, but this
>> doesn't
>> > > appear to support data-driven windowing. I've also considered using a
>> > > custom Processor to perform the aggregation, but don't see how to
>> take an
>> > > output-stream from a Processor and continue to work with it. This
>> area of
>> > > the system is undocumented, so I'm not sure how to proceed.
>> > >
>> > > Am I missing something? Do you have any suggestions?
>> > >
>> > > -josh
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to