Your observation is correct and by design. The operation after the flatMap() that read from the repartition topic, always needs to read from this topic. And the record was already successfully written into this topic.
Thus, processing the input record is "finished" from the point of view of the first sub-topology. There is no need to reprocess from the source topic. If we would reprocess from the source topic, the result record would end up in the re-partitioning topic twice and the operation after the flatMap() would process both. I see you point about the stream-table join. This join is not completely deterministic atm -- it's a know issue but hard to resolve. But we are working on it... Stay tuned for future release. -Matthias On 2/5/18 9:09 AM, Saïd Bouras wrote: > Hi everyone; > > I have a question about the record that is currently processed by the kafka > stream app when this app stop (suddenly or not). > > When restarting the app, the last record that was processed before the > shutdown is replayed, but I noticed that the topology don't replay the > entire DAG for that record and thus don't replay from the inputs topics of > the topology but from the last internal topic associate to a processor node. > > Example : > *leftJoin -> selectKey -> flatMap-> someOtherOperatio ->...* > > At the *flatMap* processor, we have a internal topic with *"repartition"* > suffix, and if the topology is stopped at that moment, when it restart, the > last record is picked from the repartition topic and try to execute the > operation after the *flatMap.* > > The behavior that I was expecting was to pick the message from the inputs > topics and try to replay the entire topology for that record. > > In the example, at the beginning we join with an other Ktable, the result > of that join is processed and written in the internal topic, the app > crashed, in the meantime the state of the ktable has changed and when the > topology restart, the message from the internal topic is processed with an > out dated message resulting of the left join before the crash. > Furthermore, if the app crashed because of that particular record, when the > topology restart it will crashed again.. > > The actual behavior is of course linked to the actual implementation of > consumers and producers in kafka streams (consumers retry to read the last > offset of the internal topic when the topology restart). > > Is the actual behavior not correct or I am missing something here ? > > Thanks, regards >
signature.asc
Description: OpenPGP digital signature