Hi Team, Please help me to resolve me below topics urgently:-
1. Unable to remove duplicate message in broker.below Is my Code and config File. Producer Coding Related:- PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl( "pulsar://192.168.x.x:6650" ) .build(); Producer producer = pulsarClient.newProducer() .producerName( "producer-1" ) .topic( "persistent://public/default/topic-1" ) .sendTimeout( 0 , TimeUnit.SECONDS) .create(); Configuration Side:- # Set the default behavior for message deduplication in the broker # This can be overridden per-namespace. If enabled, broker will reject # messages that were already stored in the topic brokerDeduplicationEnabled=true # Maximum number of producer information that it's going to be # persisted for deduplication purposes brokerDeduplicationMaxNumberOfProducers=1000 # Number of entries after which a dedup info snapshot is taken. # A larger interval will lead to fewer snapshots being taken, though it would # increase the topic recovery time when the entries published after the # snapshot need to be replayed. brokerDeduplicationEntriesInterval=1000 # Time of inactivity after which the broker will discard the deduplication information # relative to a disconnected producer. Default is 6 hours. brokerDeduplicationProducerInactivityTimeoutMinutes=1 2. If we produce 50000 message for a particular topic,At subscriber end we are getting less message for that same topic. We are able to produce asyn data.but not getting same in consumer side Producer Side Code:- String instjson="[{\"iRAngle\":0,\"iYAngle\":0,\"iBAngle\":0,\"ReadTime\":\""+timeStamp+"\",\"MeterNumber\":\""+meterNumber+"\",\"ModemTime\":\""+timeStamp+"\",\"PFr\":0.827,\"PFy\":0.746,\"PFb\":0.793,\"Ir\":25.89,\"Iy\":18.64,\"Ib\":21.27,\"Vrn\":217.84,\"Vyn\":217.52,\"Vbn\":218.75,\"ThreePhasePF\":0.794,\"Frequency\":50.08,\"KVA\":14.26,\"Power_KW\":11.336,\"KVAR\":8.651,\"KWH\":0,\"KVARH_Lag\":0,\"KVARH_Lead\":0,\"KVAH\":0,\"PowerOffCount\":406,\"PowerOffDuration\":125584,\"TamperCount\":3452,\"MDResetCount\":9,\"ProgrammCount\":0,\"MDResetDate\":\"01-02-2020 00:00:00\",\"MDKW\":0,\"Date_MDKW\":\"\",\"MDKVA\":0,\"Date_MDKVA\":\"\",\"KWH_Exp\":0,\"KVAH_Exp\":0,\"KWH_Imp\":0,\"KVAH_Imp\":0,\"TransID\":\"8310299\",\"Imei\":\"\"}]"; public class PulsarProducer { private PulsarClient client; public PulsarProducer() throws PulsarClientException { client = PulsarClient.builder() .serviceUrl(FilterUnit.pulserURL) .build(); } public Producer<String> getProducer(String topic) throws PulsarClientException { return client.newProducer(Schema.STRING) .producerName("APP_PRODUCER") .topic("persistent://public/default/"+topic) .sendTimeout(0, TimeUnit.SECONDS) .create(); } } Producer<String> producer = FilterUnit.producer.getProducer("topic"); JSONArray recs = new JSONArray( instjson ); for (int i = 0; i < recs.length(); i++) { JSONObject obj = recs.getJSONObject(i); producer.sendAsync(obj.toString()); } producer.closeAsync(); Consumer Side Code:- PulsarClient client = PulsarClient.builder().serviceUrl(FilterUnit.pulserURL).build(); Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("topic") .subscriptionName("INSTANT") .subscribe(); while (true) { // Wait for a message Message msg = consumer.receive(); try { if (msg.getValue() != null && !msg.getValue().toString().isEmpty()) { System.out.println("VALUE :-" + msg.getValue() ); } // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg); } catch (Exception e) { e.printStackTrace(); consumer.negativeAcknowledge(msg); } } Thanks & Regards, Tarik Afwas Software Engineer [ http://www.bcits.co.in/ | BCITS Pvt Ltd. ] Mobile : +91 7676811033 ____________________ :: DISCLAIMER :: This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.