Hello Trey, Just to clarify on your solution, the "calls-store" is the same one materialized from calls table as "val callsTable = builder.table("calls", ... Materialized.as(.. "call-store"))". So basically you are using the transformer to update the original materialized store for the callsTable here, is that right?
Guozhang On Tue, Feb 19, 2019 at 8:58 AM Trey Hutcheson <trey.hutche...@gmail.com> wrote: > Ok, I have a solution - I implemented a custom Transformer implementation > that simply accepts a key/value and writes it to a state store, then > returns the input values. It's a "write-through" transformer. It's > basically like a peek operation, but saves it to the backing state store. > But since it's not a terminal operation, the stream can still perform a .to > later. > > Basically, the transformer looks like this (in pseudo code, type parameters > omitted) > class WriteThroughTransformer(val storeName: String): Transformer { > lateinit var context: ProcessorContext > lateinit var store: KeyValueStore > > // initialize context and store > override fun init() ... > > // do a simple store.put - return inputs as KeyVvalue > override fun transform() ... > } > > And then a handy extension function: > fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, V?> = > this.transform( // supplier code) > > And then finally, the stream chain looks like this: > actions.filter { _, action -> action.action == Actions.ANSWER } > .join(callsTable) { _, call -> call } > .mapValues { call -> doAnswer(call) } > .writeThrough("calls-store") > .to("answered-calls") > > This approach doesn't actually send a message to the calls topic, but > that's ok. The stream listening on answered-calls will eventually do that > anyway. > > On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson <trey.hutche...@gmail.com> > wrote: > > > Ok, I tried your suggestion (along with several variations) and it just > > doesn't work as I'd hoped. The constructed topology emits messages from > > .mapValues call to both "calls" and "answered-calls" sinks, with the two > > sinks being sibling to one another, which still causes a race condition. > > > > So what are my options? I could spin off an asynchronous/delayed action > to > > send a message to the "answered-calls" topic, and just hope that the > > answered call had been successfully persisted to the calls backing state > > store. That seems awfully brittle to me. > > > > I could just move state outside of the streaming topology altogether, > > using an external state store, so I could explicitly govern transaction > > demarcation and guarantee consistency. But that seems antithetical to > kafka > > streams in general. > > > > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <trey.hutche...@gmail.com > > > > wrote: > > > >> Thanks for the response John. I'll see if I can track down that ticket. > >> > >> And thank you for your suggestion; I will try that once I get back to > the > >> code. That is an approach I had not considered. > >> > >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> > wrote: > >> > >>> Hi Trey, > >>> > >>> I think there is a ticket open requesting to be able to re-use the > source > >>> topic, so I don't think it's an intentional restriction, just a > >>> consequence > >>> of the way the code is structured at the moment. > >>> > >>> Is it sufficient to send the update to "calls" and "answered-calls" at > >>> the > >>> same time? You could do something like: > >>> > >>> val answeredCalls = > >>> actions.filter { _, action -> action == Actions.ANSWER } > >>> .join(callsTable) { id, call -> call } // now a KTable > >>> .mapValues { call -> doAnswer(call) } // actual answer implementation > >>> > >>> answeredCalls.to("calls"); > >>> answeredCalls.to("answered-calls"); > >>> > >>> Does that help? > >>> > >>> -John > >>> > >>> > >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson < > trey.hutche...@gmail.com > >>> > > >>> wrote: > >>> > >>> > For context, imagine I'm building an IVR simulator. Desired workflow: > >>> > > >>> > IVR knows about a ringing call. IVR receives an IPC instruction to > >>> answer > >>> > the call. That instruction is realized by sending a message > >>> {action=ANSWER} > >>> > to the "actions" topic. > >>> > > >>> > At this point, the system needs to do two things: actually answer the > >>> call, > >>> > and then start a recording of the call, in that order. Because of > >>> > implementation peculiarities external to the system, assume that > these > >>> two > >>> > things cannot be executed together atomically. > >>> > > >>> > So this is what I'd *like* to do (warning, kotlin code, types omitted > >>> for > >>> > brevity): > >>> > > >>> > val callsTable = builder.table("calls", ...) > >>> > val actions = builder.stream("actions", ..) > >>> > > >>> > actions.filter { _, action -> action == Actions.ANSWER } > >>> > .join(callsTable) { id, call -> call } // now a KTable > >>> > .mapValues { call -> doAnswer(call) } // actual answer > implementation > >>> > .through("calls") // persist in state store > >>> > .to("answered-calls") // let other actors in the system know the > >>> call was > >>> > answered, such as start the recording process > >>> > > >>> > Now in the current version of the streams library (2.1.0), that > little > >>> bit > >>> > of topology throws an exception when trying to build it, with a > message > >>> > that a source has already been defined for the "calls" topic. So > >>> apparently > >>> > the call to .through materializes a view and defines a source, which > >>> was > >>> > already defined in the call to builder.table("calls")? > >>> > > >>> > So how do I do what I want? This sequence needs to happen in order. I > >>> have > >>> > tried .branch, but that just ends up in a race condition (the thing > >>> doing > >>> > to recording has to join to calls table and filter that the call has > >>> been > >>> > answered). > >>> > > >>> > I could create a custom processor that forwards to both sinks - but > >>> does > >>> > that really solve the problem? And if it did, how do I create a > >>> > KafkaStreams instance from a combination of StreamBuilder and > Topology? > >>> > > >>> > Thanks for the insight > >>> > Trey > >>> > > >>> > >> > -- -- Guozhang