Hello Trey,

Just to clarify on your solution, the "calls-store" is the same one
materialized from calls table as "val callsTable = builder.table("calls",
... Materialized.as(.. "call-store"))". So basically you are using the
transformer to update the original materialized store for the callsTable
here, is that right?


Guozhang

On Tue, Feb 19, 2019 at 8:58 AM Trey Hutcheson <trey.hutche...@gmail.com>
wrote:

> Ok, I have a solution - I implemented a custom Transformer implementation
> that simply accepts a key/value and writes it to a state store, then
> returns the input values. It's a "write-through" transformer. It's
> basically like a peek operation, but saves it to the backing state store.
> But since it's not a terminal operation, the stream can still perform a .to
> later.
>
> Basically, the transformer looks like this (in pseudo code, type parameters
> omitted)
> class WriteThroughTransformer(val storeName: String): Transformer {
>   lateinit var context: ProcessorContext
>   lateinit var store: KeyValueStore
>
>   // initialize context and store
>   override fun init() ...
>
>   // do a simple store.put - return inputs as KeyVvalue
>   override fun transform() ...
> }
>
> And then a handy extension function:
> fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, V?> =
> this.transform( // supplier code)
>
> And then finally, the stream chain looks like this:
> actions.filter { _, action -> action.action == Actions.ANSWER }
>     .join(callsTable) { _, call -> call }
>     .mapValues { call -> doAnswer(call) }
>     .writeThrough("calls-store")
>     .to("answered-calls")
>
> This approach doesn't actually send a message to the calls topic, but
> that's ok. The stream listening on answered-calls will eventually do that
> anyway.
>
> On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson <trey.hutche...@gmail.com>
> wrote:
>
> > Ok, I tried your suggestion (along with several variations) and it just
> > doesn't work as I'd hoped. The constructed topology emits messages from
> > .mapValues call to both "calls" and "answered-calls" sinks, with the two
> > sinks being sibling to one another, which still causes a race condition.
> >
> > So what are my options? I could spin off an asynchronous/delayed action
> to
> > send a message to the "answered-calls" topic, and just hope that the
> > answered call had been successfully persisted to the calls backing state
> > store. That seems awfully brittle to me.
> >
> > I could just move state outside of the streaming topology altogether,
> > using an external state store, so I could explicitly govern transaction
> > demarcation and guarantee consistency. But that seems antithetical to
> kafka
> > streams in general.
> >
> > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <trey.hutche...@gmail.com
> >
> > wrote:
> >
> >> Thanks for the response John. I'll see if I can track down that ticket.
> >>
> >> And thank you for your suggestion; I will try that once I get back to
> the
> >> code. That is an approach I had not considered.
> >>
> >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io>
> wrote:
> >>
> >>> Hi Trey,
> >>>
> >>> I think there is a ticket open requesting to be able to re-use the
> source
> >>> topic, so I don't think it's an intentional restriction, just a
> >>> consequence
> >>> of the way the code is structured at the moment.
> >>>
> >>> Is it sufficient to send the update to "calls" and "answered-calls" at
> >>> the
> >>> same time? You could do something like:
> >>>
> >>> val answeredCalls =
> >>>  actions.filter { _, action -> action == Actions.ANSWER }
> >>>   .join(callsTable) { id, call -> call }  // now a KTable
> >>>   .mapValues { call -> doAnswer(call) } // actual answer implementation
> >>>
> >>> answeredCalls.to("calls");
> >>> answeredCalls.to("answered-calls");
> >>>
> >>> Does that help?
> >>>
> >>> -John
> >>>
> >>>
> >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <
> trey.hutche...@gmail.com
> >>> >
> >>> wrote:
> >>>
> >>> > For context, imagine I'm building an IVR simulator. Desired workflow:
> >>> >
> >>> > IVR knows about a ringing call. IVR receives an IPC instruction to
> >>> answer
> >>> > the call. That instruction is realized by sending a message
> >>> {action=ANSWER}
> >>> > to the "actions" topic.
> >>> >
> >>> > At this point, the system needs to do two things: actually answer the
> >>> call,
> >>> > and then start a recording of the call, in that order. Because of
> >>> > implementation peculiarities external to the system, assume that
> these
> >>> two
> >>> > things cannot be executed together atomically.
> >>> >
> >>> > So this is what I'd *like* to do (warning, kotlin code, types omitted
> >>> for
> >>> > brevity):
> >>> >
> >>> > val callsTable = builder.table("calls", ...)
> >>> > val actions = builder.stream("actions", ..)
> >>> >
> >>> > actions.filter { _, action -> action == Actions.ANSWER }
> >>> >   .join(callsTable) { id, call -> call }  // now a KTable
> >>> >   .mapValues { call -> doAnswer(call) } // actual answer
> implementation
> >>> >   .through("calls") // persist in state store
> >>> >   .to("answered-calls") // let other actors in the system know the
> >>> call was
> >>> > answered, such as start the recording process
> >>> >
> >>> > Now in the current version of the streams library (2.1.0), that
> little
> >>> bit
> >>> > of topology throws an exception when trying to build it, with a
> message
> >>> > that a source has already been defined for the "calls" topic. So
> >>> apparently
> >>> > the call to .through materializes a view and defines a source, which
> >>> was
> >>> > already defined in the call to builder.table("calls")?
> >>> >
> >>> > So how do I do what I want? This sequence needs to happen in order. I
> >>> have
> >>> > tried .branch, but that just ends up in a race condition (the thing
> >>> doing
> >>> > to recording has to join to calls table and filter that the call has
> >>> been
> >>> > answered).
> >>> >
> >>> > I could create a custom processor that forwards to both sinks - but
> >>> does
> >>> > that really solve the problem? And if it did, how do I create a
> >>> > KafkaStreams instance from a combination of StreamBuilder and
> Topology?
> >>> >
> >>> > Thanks for the insight
> >>> > Trey
> >>> >
> >>>
> >>
>


-- 
-- Guozhang

Reply via email to