Hello Virgil, Your use case is a common scenario for some multi-thread async processing, and your concerns about states missing during commit offset are valid: if changelog is disabled, and if the task gets migrated to another instance during rebalance, that task will resume with an empty state; if the task happen to stay at the same instance, then it will resume with whatever data that is in the state store.
As of today I think one workaround would be still enabling changelog topic for your state store. So that after the rebalance any records that have not been processed will still be in the store, and once they are completed processing we can then remove them from the store. I've also filed https://issues.apache.org/jira/browse/KAFKA-6989 for a general feature request, please feel free to share your feedbacks on that. Guozhang On Mon, Jun 4, 2018 at 7:58 AM, Virgil Palanciuc <virg...@adobe.com.invalid> wrote: > Hi, > > I’m trying to “enrich” a stream of events (i.e. roughly speaking read > messages from one topic, make a query to an external system, write to > another topic). The problem is – external system can handle lots of calls, > but has a high latency. > I can easily do the enrichment if I use one thread per message, but that's > kinda' wasteful (I'd need a lot of partitions/ tasks to have reasonable > throughput).... so, I'm thinking about using multiplexed IO. Like, have a > processor that just registers the "input message" in a local state store, > without committing the task; and then, the punctuator can look at the > registered "input messages", start the IO for all of them, forward the > results for the completed IO tasks to subsequent processors, and commit > progress. The state store can be non-replicated, since I can reprocess > messages in case of failure (and I don't really mind duplication of > messages on the output topic) > > The question is... would that work? I'm concerned about rebalancing (when > one worker is added/removed), and specifically: > - If I understand correctly the code, during partition rebalance the tasks > may be suspended, and the suspend() method will actually commit the offsets > (the last task where process() was completed, regardless whether it invoked > commit() or not). That'd be bad, since it means that on rebalance, I might > end up skipping records (I'm not concerned about duplication, within > reasonable bounds; but I am concerned about skipping records) > - Not sure what happens to the state during rebalancing (if I disable the > changelog - i.e. I make all the state local). Is all state lost? > > Thanks, > Virgil. > > -- -- Guozhang