Hi Team , We are trying to build a data pipeline where we have to set up two different kafka consumers for two different kafka topics and with a single SNS sink. Below is the sample code for the same , but looks like from one of the sources the events are not flowing into the cluster. We are using the merge API for merging two input sources here.
DataStream<Tuple2<String, AuditEvent>> inputStream1 = env.addSource(flinkKafkaConsumer) .uid(configParams.get(AppConstant.JOB_PUBLISHER_STATE_KAFKA_SOURCE_UUID)) .name(AppConstant.FHIR_SOURCE); DataStream<Tuple2<String, AuditEvent>> inputStream2 = env.addSource(flinkKafkaConsumerFromRejectedTopic) .uid("testUID") .name(AppConstant.FHIR_SOURCE_FOR_REJECTED_QUEUE); DataStream<Tuple2<String, AuditEvent>> allStreams = inputStream1.union(inputStream2); In the above code snippet, allStreams is only pulling events from inputStream1 but expectation is allStreams should be pulling events from both inputStream1 and inputStream2. Could you please help us in understanding if this is the right approach or if we are missing something. Thanks, Sudhansu