Hi Guozhang, Thank you for writing the KIP-28 up. (Hope this is the right thread for me to post some comments. :)
I still have some confusing about the implementation of the Processor: 1. why do we maintain a separate consumer and producer for each worker thread? — from my understanding, the new consumer api will be able to fetch certain topic-partition. Is one consumer enough for one Kafka.process (it is shared among work threads)? The same thing for the producer, is one producer enough for sending out messages to the brokers? Will this have better performance? 2. how is the “Stream Synchronization” achieved? — you talked about “pause” and “notify” the consumer. Still not very clear. If worker thread has group_1 {topicA-0, topicB-0} and group_2 {topicA-1, topicB-1}, and topicB is much slower. How can we pause the consumer to sync topicA and topicB if there is only one consumer? 3. how does the partition timestamp monotonically increase? — “When the lowest timestamp corresponding record gets processed by the thread, the partition time possibly gets advanced.” How does the “gets advanced” work? Do we get another “lowest message timestamp value”? But doing this, may not get an “advanced” timestamp. 4. thoughts about the local state management. — from the description, I think there is one kv store per partition-group. That means if one work thread is assigned more than one partition groups, it will have more than one kv-store connections. How can we avoid mis-operation? Because one partition group can easily write to another partition group’s kv store (they are in the same thread). 5. do we plan to implement the throttle ? — since we are “forwarding” the messages. It is very possible that, upstream-processor is much faster than the downstream-processor, how do we plan to deal with this? 6. how does the parallelism work? — we achieve this by simply adding more threads? Or we plan to have the mechanism which can deploy different threads to different machines? It is easy to image that we can deploy different processors to different machines, then how about the work threads? Then how is the fault-tolerance? Maybe this is out-of-scope of the KIP? Two nits in the KIP-28 doc: 1. miss the “close” method interfaceProcessor<K1,V1,K2,V2>. We have the “override close()” in KafkaProcessor. 2. “punctuate” does not accept “parameter”, while StatefulProcessJob has a punctuate method that accepts parameter. Thanks, Yan