Hi Team, 

Please help me to resolve me below topics urgently:- 


1. Unable to remove duplicate message in broker.below Is my Code and config 
File. 

Producer Coding Related:- 
PulsarClient pulsarClient = PulsarClient.builder() 
.serviceUrl( "pulsar://192.168.x.x:6650" ) 
.build(); 
Producer producer = pulsarClient.newProducer() 
.producerName( "producer-1" ) 
.topic( "persistent://public/default/topic-1" ) 
.sendTimeout( 0 , TimeUnit.SECONDS) 
.create(); 

Configuration Side:- 
# Set the default behavior for message deduplication in the broker 
# This can be overridden per-namespace. If enabled, broker will reject 
# messages that were already stored in the topic 
brokerDeduplicationEnabled=true 

# Maximum number of producer information that it's going to be 
# persisted for deduplication purposes 
brokerDeduplicationMaxNumberOfProducers=1000 

# Number of entries after which a dedup info snapshot is taken. 
# A larger interval will lead to fewer snapshots being taken, though it would 
# increase the topic recovery time when the entries published after the 
# snapshot need to be replayed. 
brokerDeduplicationEntriesInterval=1000 

# Time of inactivity after which the broker will discard the deduplication 
information 
# relative to a disconnected producer. Default is 6 hours. 
brokerDeduplicationProducerInactivityTimeoutMinutes=1 

2. If we produce 50000 message for a particular topic,At subscriber end we are 
getting less message for that same topic. 
We are able to produce asyn data.but not getting same in consumer side 

Producer Side Code:- 
String 
instjson="[{\"iRAngle\":0,\"iYAngle\":0,\"iBAngle\":0,\"ReadTime\":\""+timeStamp+"\",\"MeterNumber\":\""+meterNumber+"\",\"ModemTime\":\""+timeStamp+"\",\"PFr\":0.827,\"PFy\":0.746,\"PFb\":0.793,\"Ir\":25.89,\"Iy\":18.64,\"Ib\":21.27,\"Vrn\":217.84,\"Vyn\":217.52,\"Vbn\":218.75,\"ThreePhasePF\":0.794,\"Frequency\":50.08,\"KVA\":14.26,\"Power_KW\":11.336,\"KVAR\":8.651,\"KWH\":0,\"KVARH_Lag\":0,\"KVARH_Lead\":0,\"KVAH\":0,\"PowerOffCount\":406,\"PowerOffDuration\":125584,\"TamperCount\":3452,\"MDResetCount\":9,\"ProgrammCount\":0,\"MDResetDate\":\"01-02-2020
 
00:00:00\",\"MDKW\":0,\"Date_MDKW\":\"\",\"MDKVA\":0,\"Date_MDKVA\":\"\",\"KWH_Exp\":0,\"KVAH_Exp\":0,\"KWH_Imp\":0,\"KVAH_Imp\":0,\"TransID\":\"8310299\",\"Imei\":\"\"}]";
 



public class PulsarProducer { 
private PulsarClient client; 
public PulsarProducer() throws PulsarClientException { 
client = PulsarClient.builder() 
.serviceUrl(FilterUnit.pulserURL) 
.build(); 
} 

public Producer<String> getProducer(String topic) throws PulsarClientException 
{ 
return client.newProducer(Schema.STRING) 
.producerName("APP_PRODUCER") 
.topic("persistent://public/default/"+topic) 
.sendTimeout(0, TimeUnit.SECONDS) 
.create(); 
} 

} 

Producer<String> producer = FilterUnit.producer.getProducer("topic"); 
JSONArray recs = new JSONArray( instjson ); 
for (int i = 0; i < recs.length(); i++) { 
JSONObject obj = recs.getJSONObject(i); 
producer.sendAsync(obj.toString()); 
} 
producer.closeAsync(); 

Consumer Side Code:- 

PulsarClient client = 
PulsarClient.builder().serviceUrl(FilterUnit.pulserURL).build(); 

Consumer<String> consumer = client.newConsumer(Schema.STRING) 
.topic("topic") 
.subscriptionName("INSTANT") 
.subscribe(); 

while (true) { 
// Wait for a message 
Message msg = consumer.receive(); 
try { 
if (msg.getValue() != null && !msg.getValue().toString().isEmpty()) { 
System.out.println("VALUE :-" + msg.getValue() ); 
} 
// Acknowledge the message so that it can be deleted by the message broker 
consumer.acknowledge(msg); 
} catch (Exception e) { 
e.printStackTrace(); 
consumer.negativeAcknowledge(msg); 
} 
} 




Thanks & Regards, 
Tarik Afwas 
Software Engineer 
[ http://www.bcits.co.in/ | BCITS Pvt Ltd. ] 
Mobile : +91 7676811033 
____________________ 
:: DISCLAIMER :: 
This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error please notify the system manager. This 
message contains confidential information and is intended only for the 
individual named. If you are not the named addressee you should not 
disseminate, distribute or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. If you are not the intended recipient you are 
notified that disclosing, copying, distributing or taking any action in 
reliance on the contents of this information is strictly prohibited. 

Reply via email to