For debug, you can just pull data from inputStream2. sudhansu069 [via Apache Flink User Mailing List archive.] < ml+s2336050n44010...@n4.nabble.com> 于2021年5月27日周四 下午11:22写道:
> Hi Team , > > We are trying to build a data pipeline where we have to set up two > different kafka consumers for two different kafka topics and with a single > SNS sink. > Below is the sample code for the same , but looks like from one of the > sources the events are not flowing into the cluster. We are using the merge > API for merging two input sources here. > > > DataStream<Tuple2<String, AuditEvent>> inputStream1 = > env.addSource(flinkKafkaConsumer) > .uid(configParams.get(AppConstant.JOB_PUBLISHER_STATE_KAFKA_SOURCE_UUID)) > .name(AppConstant.FHIR_SOURCE); > > DataStream<Tuple2<String, AuditEvent>> inputStream2 = > env.addSource(flinkKafkaConsumerFromRejectedTopic) > .uid("testUID") > .name(AppConstant.FHIR_SOURCE_FOR_REJECTED_QUEUE); > > DataStream<Tuple2<String, AuditEvent>> allStreams = > inputStream1.union(inputStream2); > > > In the above code snippet, allStreams is only pulling events > from inputStream1 but expectation is allStreams should be pulling events > from both inputStream1 and inputStream2. Could you please help us in > understanding if this is the right approach or if we are missing something. > > Thanks, > Sudhansu > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enable-Multiple-kafka-Consumer-sources-for-job-tp44010.html > To start a new topic under Apache Flink User Mailing List archive., email > ml+s2336050n1...@n4.nabble.com > To unsubscribe from Apache Flink User Mailing List archive., click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >