Thank you Bharath and Boris, I really appreciate your response and guidance!
Regards, Eric On Tue, Oct 22, 2019 at 6:00 PM Boris S <bor...@gmail.com> wrote: > Yes, to answer your second question , you should be able to fork the > streams. Message Streams are idempotent and can be used in different > pipelines. > > With regards to group id, Samza does set the group ids, but, I think, they > are the same for the whole job. The idea behind it is, that Samza is not > using Kafka level partitioning. Samza itself distributes the partitions > directly to its tasks. > > Boris. > > On Tue, Oct 22, 2019 at 11:55 AM Bharath Kumara Subramanian < > codin.mart...@gmail.com> wrote: > > > Hi Eric, > > > > Based on the source code, it appears that each job designates a unique > > > group id when subscribing to kafka topic, is my understanding correct? > > > > > > > Yes. Samza uses a combination of job name and job id to generate the > group > > id. > > > > > > > is it possible to have 2 independent stack > > > > of operations applied on the same InputStream? > > > > > > Yes. The code snippet provided in the email should work as expected. > > > > Hope that helps. > > > > Thanks, > > Bharath > > > > On Fri, Oct 18, 2019 at 5:55 AM Eric Shieh <datosl...@gmail.com> wrote: > > > > > Hi, > > > > > > Based on the source code, it appears that each job designates a unique > > > group id when subscribing to kafka topic, is my understanding correct? > > If > > > so, since one cannot call appDescriptor.getInputStream with the > > > same KafkaInputDescriptor twice, is it possible to have 2 independent > > stack > > > of operations applied on the same InputStream? In essence, I have a > > > requirement to process a message from 1 InputStream and write to 2 > > > OutputStreams or sinks after 2 different independent stacks of > operations > > > applied. One way to solve this is to deploy 2 independent jobs but the > > > downside of it is it would be difficult to synchronize the 2 jobs. Is > it > > > possible to do the following: > > > > > > MessageStream<Map<String,Object> ms = > appDescriptor.getInputStream(kid); > > > MessageStream msForkPoint = ms.map(mapping_logic1); > > > msForkPoint.filter(filter_logic_1).sendTo(outputSream1); > > > msForkPoint.map(mapping_logic2).sink(write_to_DB); > > > > > > Based on the source code, each operation instantiates a new instance of > > > MessageStream and registers the new StreamOperatorSpec with the > previous > > > MessageStream instance's StreamOperatorSpec essentially forming a > "linked > > > list" of parent-child StreamOperatorSpecs. Since each parent > > OperatorSpec > > > maintains a LinkedHashSet of next OperatorSpecs, the above code of > > forking > > > 2 independent operation stacks after the initial map seems to be > > feasible. > > > > > > Regards, > > > > > > Eric > > > > > >