Thanks a lot Bruno! I am just trying the Processor API as you mentioned above, so the processor will write record to another kafka topic with new key. I am just having difficulty to read in another processor from that kafka topic and wondering if I need to create another stream with source as intermediate kafka topic? Else how can I read from intermediate topic in another processor?
On Fri, Jul 14, 2023 at 9:25 PM Bruno Cadonna <cado...@apache.org> wrote: > Hi Pushkar, > > The events after repartitioning are processed by a different task than > the task that read the events from the source topic. The task assignor > assigns those tasks to stream threads. So events with the same key will > be processed by the same task. As far as I understood from your earlier > message, you want that events with same call ID after the repartitioning > are processed by the same applications instance. If same keys are > processed by the same task then that implies that the events are > processed on the same application instance. > > With the Processor API you can also repartition data by specifying a new > key for each record and writing the records to a Kafka topic. With an > other processor you can then read the repartitioned events from that > topic. However, you have to manage the intermediate topic yourself. > > In the DSL there is the process() method that allows to use custom code > for processing events similar to the Processor API. > > Best, > Bruno > > On 7/14/23 5:09 PM, Pushkar Deole wrote: > > Thanks Bruno.. > > > > What do you mean exactly with "...and then process them in that order"? > > > > By this, I mean to say if the order of events in partition will be > > processed after repartition. Probably I don't need to go through internal > > details but does the partitions of topic are again assigned to stream > > threads tasks and hence could be processed by another stream thread than > > the one that read the event from source partition? > > > > Also, the repartition method is available as part of Streams DSL API. We > > seem to be using Streams processor API so is there a similar way to > achieve > > repartitioning in Processor API? > > > > Or if we want to go other way round, if we want to move from Processor > API > > to Streams DSL then how can we migrate, i.e. where do we store the logic > > that we normally store in Processor in Topology? > > > > On Fri, Jul 14, 2023 at 3:50 PM Bruno Cadonna <cado...@apache.org> > wrote: > > > >> Hi Pushkar, > >> > >> you can use repartition() for repartition your data. Method through() is > >> actually deprecated in favor of repartition(). Before you repartition > >> you need to specify the new key with selectKey(). > >> > >> What do you mean exactly with "...and then process them in that order"? > >> > >> The order of the events from the same source partition (partition before > >> repartitioning) that have the same call ID (or more generally that end > >> up in the same partition after repartitioning) will be preserved but > >> Kafka does not guarantee the order of events from different source > >> partitions. > >> > >> Best, > >> Bruno > >> > >> On 7/9/23 2:45 PM, Pushkar Deole wrote: > >>> Hi, > >>> > >>> We have a kafka streams application that consumes from multiple topic > >> with > >>> different keys. Before processing these events in the application, we > >> want > >>> to repartition those events on a single key that will ensure related > >> events > >>> are processed by same application instance. e.g. the events on multiple > >>> topics are related to same call however they are keyed on multiple keys > >> on > >>> those topics and hence go to different application instances but after > >>> consuming those events by the streams, we want to partition them again > on > >>> the call id and then process them in that order. Is this possible? > >>> I got to know about through() method on the KStream which seems to be > >> doing > >>> similar thing however not sure if it would achieve the below > >> functionality: > >>> > >>> Call with id C123 is initiated and following events arrive on 3 topics > >> with > >>> respective keys: > >>> > >>> Event 1 on TopicA: with key a1 > >>> Event 2 on TopicB: with key b1 > >>> Event 3 on TopicC: with key c1 > >>> > >>> Let's assume these are consumed by 3 different instances of kafka > streams > >>> application however those application process them with through() > method > >>> with a consolidatedTopic with the key C123. > >>> Will this ensure that these 3 events go to the same partition on > >>> consolidatedTopic, and hence after repartitioning, those will be > consumed > >>> by same instance of application? > >>> > >> > > >