Hi Team,
Please help me to resolve me below topics urgently:-
1. Unable to remove duplicate message in broker.below Is my Code and config
File.
Producer Coding Related:-
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl( "pulsar://192.168.x.x:6650" )
.build();
Producer producer = pulsarClient.newProducer()
.producerName( "producer-1" )
.topic( "persistent://public/default/topic-1" )
.sendTimeout( 0 , TimeUnit.SECONDS)
.create();
Configuration Side:-
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=true
# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=1000
# Number of entries after which a dedup info snapshot is taken.
# A larger interval will lead to fewer snapshots being taken, though it would
# increase the topic recovery time when the entries published after the
# snapshot need to be replayed.
brokerDeduplicationEntriesInterval=1000
# Time of inactivity after which the broker will discard the deduplication
information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=1
2. If we produce 50000 message for a particular topic,At subscriber end we are
getting less message for that same topic.
We are able to produce asyn data.but not getting same in consumer side
Producer Side Code:-
String
instjson="[{\"iRAngle\":0,\"iYAngle\":0,\"iBAngle\":0,\"ReadTime\":\""+timeStamp+"\",\"MeterNumber\":\""+meterNumber+"\",\"ModemTime\":\""+timeStamp+"\",\"PFr\":0.827,\"PFy\":0.746,\"PFb\":0.793,\"Ir\":25.89,\"Iy\":18.64,\"Ib\":21.27,\"Vrn\":217.84,\"Vyn\":217.52,\"Vbn\":218.75,\"ThreePhasePF\":0.794,\"Frequency\":50.08,\"KVA\":14.26,\"Power_KW\":11.336,\"KVAR\":8.651,\"KWH\":0,\"KVARH_Lag\":0,\"KVARH_Lead\":0,\"KVAH\":0,\"PowerOffCount\":406,\"PowerOffDuration\":125584,\"TamperCount\":3452,\"MDResetCount\":9,\"ProgrammCount\":0,\"MDResetDate\":\"01-02-2020
00:00:00\",\"MDKW\":0,\"Date_MDKW\":\"\",\"MDKVA\":0,\"Date_MDKVA\":\"\",\"KWH_Exp\":0,\"KVAH_Exp\":0,\"KWH_Imp\":0,\"KVAH_Imp\":0,\"TransID\":\"8310299\",\"Imei\":\"\"}]";
public class PulsarProducer {
private PulsarClient client;
public PulsarProducer() throws PulsarClientException {
client = PulsarClient.builder()
.serviceUrl(FilterUnit.pulserURL)
.build();
}
public Producer<String> getProducer(String topic) throws PulsarClientException
{
return client.newProducer(Schema.STRING)
.producerName("APP_PRODUCER")
.topic("persistent://public/default/"+topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
}
}
Producer<String> producer = FilterUnit.producer.getProducer("topic");
JSONArray recs = new JSONArray( instjson );
for (int i = 0; i < recs.length(); i++) {
JSONObject obj = recs.getJSONObject(i);
producer.sendAsync(obj.toString());
}
producer.closeAsync();
Consumer Side Code:-
PulsarClient client =
PulsarClient.builder().serviceUrl(FilterUnit.pulserURL).build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("topic")
.subscriptionName("INSTANT")
.subscribe();
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
if (msg.getValue() != null && !msg.getValue().toString().isEmpty()) {
System.out.println("VALUE :-" + msg.getValue() );
}
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
e.printStackTrace();
consumer.negativeAcknowledge(msg);
}
}
Thanks & Regards,
Tarik Afwas
Software Engineer
[ http://www.bcits.co.in/ | BCITS Pvt Ltd. ]
Mobile : +91 7676811033
____________________
:: DISCLAIMER ::
This email and any files transmitted with it are confidential and intended
solely for the use of the individual or entity to whom they are addressed. If
you have received this email in error please notify the system manager. This
message contains confidential information and is intended only for the
individual named. If you are not the named addressee you should not
disseminate, distribute or copy this e-mail. Please notify the sender
immediately by e-mail if you have received this e-mail by mistake and delete
this e-mail from your system. If you are not the intended recipient you are
notified that disclosing, copying, distributing or taking any action in
reliance on the contents of this information is strictly prohibited.