Thanks a lot Bruno!

I am just trying the Processor API as you mentioned above, so the processor
will write record to another kafka topic with new key. I am just having
difficulty to read in another processor from that kafka topic and wondering
if I need to create another stream with source as intermediate kafka topic?
Else how can I read from intermediate topic in another processor?

On Fri, Jul 14, 2023 at 9:25 PM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Pushkar,
>
> The events after repartitioning are processed by a different task than
> the task that read the events from the source topic. The task assignor
> assigns those tasks to stream threads. So events with the same key will
> be processed by the same task. As far as I understood from your earlier
> message, you want that events with same call ID after the repartitioning
> are processed by the same applications instance. If same keys are
> processed by the same task then that implies that the events  are
> processed on the same application instance.
>
> With the Processor API you can also repartition data by specifying a new
> key for each record and writing the records to a Kafka topic. With an
> other processor you can then read the repartitioned events from that
> topic. However, you have to manage the intermediate topic yourself.
>
> In the DSL there is the process() method that allows to use custom code
> for processing events similar to the Processor API.
>
> Best,
> Bruno
>
> On 7/14/23 5:09 PM, Pushkar Deole wrote:
> > Thanks Bruno..
> >
> > What do you mean exactly with "...and then process them in that order"?
> >
> > By this, I mean to say if the order of events in partition will be
> > processed after repartition. Probably I don't need to go through internal
> > details but does the partitions of topic are again assigned to stream
> > threads tasks and hence could be processed by another stream thread than
> > the one that read the event from source partition?
> >
> > Also, the repartition method is available as part of Streams DSL API. We
> > seem to be using Streams processor API so is there a similar way to
> achieve
> > repartitioning in Processor API?
> >
> > Or if we want to go other way round, if we want to move from Processor
> API
> > to Streams DSL then how can we migrate, i.e. where do we store the logic
> > that we normally store in Processor in Topology?
> >
> > On Fri, Jul 14, 2023 at 3:50 PM Bruno Cadonna <cado...@apache.org>
> wrote:
> >
> >> Hi Pushkar,
> >>
> >> you can use repartition() for repartition your data. Method through() is
> >> actually deprecated in favor of repartition(). Before you repartition
> >> you need to specify the new key with selectKey().
> >>
> >> What do you mean exactly with "...and then process them in that order"?
> >>
> >> The order of the events from the same source partition (partition before
> >> repartitioning) that have the same call ID (or more generally that end
> >> up in the same partition after repartitioning) will be preserved but
> >> Kafka does not guarantee the order of events from different source
> >> partitions.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 7/9/23 2:45 PM, Pushkar Deole wrote:
> >>> Hi,
> >>>
> >>> We have a kafka streams application that consumes from multiple topic
> >> with
> >>> different keys. Before processing these events in the application, we
> >> want
> >>> to repartition those events on a single key that will ensure related
> >> events
> >>> are processed by same application instance. e.g. the events on multiple
> >>> topics are related to same call however they are keyed on multiple keys
> >> on
> >>> those topics and hence go to different application instances but after
> >>> consuming those events by the streams, we want to partition them again
> on
> >>> the call id and then process them in that order. Is this possible?
> >>> I got to know about through() method on the KStream which seems to be
> >> doing
> >>> similar thing however not sure if it would achieve the below
> >> functionality:
> >>>
> >>> Call with id C123 is initiated and following events arrive on 3 topics
> >> with
> >>> respective keys:
> >>>
> >>> Event 1 on TopicA: with key a1
> >>> Event 2 on TopicB: with key b1
> >>> Event 3 on TopicC: with key c1
> >>>
> >>> Let's assume these are consumed by 3 different instances of kafka
> streams
> >>> application however those application process them with through()
> method
> >>> with a consolidatedTopic with the key C123.
> >>> Will this ensure that these 3 events go to the same partition on
> >>> consolidatedTopic, and hence after repartitioning, those will be
> consumed
> >>> by same instance of application?
> >>>
> >>
> >
>

Reply via email to