Hi Team ,

We are trying to build a data pipeline where we have to set up two
different kafka consumers for two different kafka topics and with a single
SNS sink.
Below is the sample code for the same , but looks like from one of the
sources the events are not flowing into the cluster. We are using the merge
API for merging two input sources here.


DataStream<Tuple2<String, AuditEvent>> inputStream1 =
env.addSource(flinkKafkaConsumer)
    .uid(configParams.get(AppConstant.JOB_PUBLISHER_STATE_KAFKA_SOURCE_UUID))
    .name(AppConstant.FHIR_SOURCE);

DataStream<Tuple2<String, AuditEvent>> inputStream2 =
env.addSource(flinkKafkaConsumerFromRejectedTopic)
    .uid("testUID")
    .name(AppConstant.FHIR_SOURCE_FOR_REJECTED_QUEUE);

DataStream<Tuple2<String, AuditEvent>> allStreams =
inputStream1.union(inputStream2);


In the above code snippet, allStreams is only pulling events
from inputStream1 but expectation is allStreams should be pulling events
from both inputStream1 and inputStream2. Could you please help us in
understanding if this is the right approach or if we are missing something.

Thanks,
Sudhansu

Reply via email to