Thank you Bharath and Boris, I really appreciate your response and guidance!

Regards,

Eric

On Tue, Oct 22, 2019 at 6:00 PM Boris S <bor...@gmail.com> wrote:

> Yes, to answer your second question , you should be able to fork the
> streams. Message Streams are idempotent and can be used in different
> pipelines.
>
> With regards to group id, Samza does set the group ids, but, I think, they
> are the same for the whole job. The idea behind it is, that Samza is not
> using Kafka level partitioning. Samza itself distributes the partitions
> directly to its tasks.
>
> Boris.
>
> On Tue, Oct 22, 2019 at 11:55 AM Bharath Kumara Subramanian <
> codin.mart...@gmail.com> wrote:
>
> > Hi Eric,
> >
> > Based on the source code, it appears that each job designates a unique
> > > group id when subscribing to kafka topic, is my understanding correct?
> > >
> >
> > Yes. Samza uses a combination of job name and job id to generate the
> group
> > id.
> >
> >
> > > is it possible to have 2 independent stack
> >
> > of operations applied on the same InputStream?
> >
> >
> > Yes. The code snippet provided in the email should work as expected.
> >
> > Hope that helps.
> >
> > Thanks,
> > Bharath
> >
> > On Fri, Oct 18, 2019 at 5:55 AM Eric Shieh <datosl...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Based on the source code, it appears that each job designates a unique
> > > group id when subscribing to kafka topic, is my understanding correct?
> > If
> > > so, since one cannot call appDescriptor.getInputStream with the
> > > same KafkaInputDescriptor twice, is it possible to have 2 independent
> > stack
> > > of operations applied on the same InputStream?  In essence, I have a
> > > requirement to process a message from 1 InputStream and write to 2
> > > OutputStreams or sinks after 2 different independent stacks of
> operations
> > > applied.  One way to solve this is to deploy 2 independent jobs but the
> > > downside of it is it would be difficult to synchronize the 2 jobs.  Is
> it
> > > possible to do the following:
> > >
> > > MessageStream<Map<String,Object> ms =
> appDescriptor.getInputStream(kid);
> > > MessageStream msForkPoint = ms.map(mapping_logic1);
> > > msForkPoint.filter(filter_logic_1).sendTo(outputSream1);
> > > msForkPoint.map(mapping_logic2).sink(write_to_DB);
> > >
> > > Based on the source code, each operation instantiates a new instance of
> > > MessageStream and registers the new StreamOperatorSpec with the
> previous
> > > MessageStream instance's StreamOperatorSpec essentially forming a
> "linked
> > > list" of parent-child StreamOperatorSpecs.  Since each parent
> > OperatorSpec
> > > maintains a LinkedHashSet of next OperatorSpecs, the above code of
> > forking
> > > 2 independent operation stacks after the initial map seems to be
> > feasible.
> > >
> > > Regards,
> > >
> > > Eric
> > >
> >
>

Reply via email to