Hi, Based on the source code, it appears that each job designates a unique group id when subscribing to kafka topic, is my understanding correct? If so, since one cannot call appDescriptor.getInputStream with the same KafkaInputDescriptor twice, is it possible to have 2 independent stack of operations applied on the same InputStream? In essence, I have a requirement to process a message from 1 InputStream and write to 2 OutputStreams or sinks after 2 different independent stacks of operations applied. One way to solve this is to deploy 2 independent jobs but the downside of it is it would be difficult to synchronize the 2 jobs. Is it possible to do the following:
MessageStream<Map<String,Object> ms = appDescriptor.getInputStream(kid); MessageStream msForkPoint = ms.map(mapping_logic1); msForkPoint.filter(filter_logic_1).sendTo(outputSream1); msForkPoint.map(mapping_logic2).sink(write_to_DB); Based on the source code, each operation instantiates a new instance of MessageStream and registers the new StreamOperatorSpec with the previous MessageStream instance's StreamOperatorSpec essentially forming a "linked list" of parent-child StreamOperatorSpecs. Since each parent OperatorSpec maintains a LinkedHashSet of next OperatorSpecs, the above code of forking 2 independent operation stacks after the initial map seems to be feasible. Regards, Eric