Hi,

Based on the source code, it appears that each job designates a unique
group id when subscribing to kafka topic, is my understanding correct?  If
so, since one cannot call appDescriptor.getInputStream with the
same KafkaInputDescriptor twice, is it possible to have 2 independent stack
of operations applied on the same InputStream?  In essence, I have a
requirement to process a message from 1 InputStream and write to 2
OutputStreams or sinks after 2 different independent stacks of operations
applied.  One way to solve this is to deploy 2 independent jobs but the
downside of it is it would be difficult to synchronize the 2 jobs.  Is it
possible to do the following:

MessageStream<Map<String,Object> ms = appDescriptor.getInputStream(kid);
MessageStream msForkPoint = ms.map(mapping_logic1);
msForkPoint.filter(filter_logic_1).sendTo(outputSream1);
msForkPoint.map(mapping_logic2).sink(write_to_DB);

Based on the source code, each operation instantiates a new instance of
MessageStream and registers the new StreamOperatorSpec with the previous
MessageStream instance's StreamOperatorSpec essentially forming a "linked
list" of parent-child StreamOperatorSpecs.  Since each parent  OperatorSpec
maintains a LinkedHashSet of next OperatorSpecs, the above code of forking
2 independent operation stacks after the initial map seems to be feasible.

Regards,

Eric

Reply via email to