For context, imagine I'm building an IVR simulator. Desired workflow: IVR knows about a ringing call. IVR receives an IPC instruction to answer the call. That instruction is realized by sending a message {action=ANSWER} to the "actions" topic.
At this point, the system needs to do two things: actually answer the call, and then start a recording of the call, in that order. Because of implementation peculiarities external to the system, assume that these two things cannot be executed together atomically. So this is what I'd *like* to do (warning, kotlin code, types omitted for brevity): val callsTable = builder.table("calls", ...) val actions = builder.stream("actions", ..) actions.filter { _, action -> action == Actions.ANSWER } .join(callsTable) { id, call -> call } // now a KTable .mapValues { call -> doAnswer(call) } // actual answer implementation .through("calls") // persist in state store .to("answered-calls") // let other actors in the system know the call was answered, such as start the recording process Now in the current version of the streams library (2.1.0), that little bit of topology throws an exception when trying to build it, with a message that a source has already been defined for the "calls" topic. So apparently the call to .through materializes a view and defines a source, which was already defined in the call to builder.table("calls")? So how do I do what I want? This sequence needs to happen in order. I have tried .branch, but that just ends up in a race condition (the thing doing to recording has to join to calls table and filter that the call has been answered). I could create a custom processor that forwards to both sinks - but does that really solve the problem? And if it did, how do I create a KafkaStreams instance from a combination of StreamBuilder and Topology? Thanks for the insight Trey