For context, imagine I'm building an IVR simulator. Desired workflow:

IVR knows about a ringing call. IVR receives an IPC instruction to answer
the call. That instruction is realized by sending a message {action=ANSWER}
to the "actions" topic.

At this point, the system needs to do two things: actually answer the call,
and then start a recording of the call, in that order. Because of
implementation peculiarities external to the system, assume that these two
things cannot be executed together atomically.

So this is what I'd *like* to do (warning, kotlin code, types omitted for
brevity):

val callsTable = builder.table("calls", ...)
val actions = builder.stream("actions", ..)

actions.filter { _, action -> action == Actions.ANSWER }
  .join(callsTable) { id, call -> call }  // now a KTable
  .mapValues { call -> doAnswer(call) } // actual answer implementation
  .through("calls") // persist in state store
  .to("answered-calls") // let other actors in the system know the call was
answered, such as start the recording process

Now in the current version of the streams library (2.1.0), that little bit
of topology throws an exception when trying to build it, with a message
that a source has already been defined for the "calls" topic. So apparently
the call to .through materializes a view and defines a source, which was
already defined in the call to builder.table("calls")?

So how do I do what I want? This sequence needs to happen in order. I have
tried .branch, but that just ends up in a race condition (the thing doing
to recording has to join to calls table and filter that the call has been
answered).

I could create a custom processor that forwards to both sinks - but does
that really solve the problem? And if it did, how do I create a
KafkaStreams instance from a combination of StreamBuilder and Topology?

Thanks for the insight
Trey

Reply via email to