> 1. Unable to remove duplicate message in broker.below Is my Code and
config File.

How did you send the messages and how did you verify the messages are
duplicated?

Just FYI the duplication is identified by sequence id, not by content. Just
want to make sure the verification method you used is correct.

> If we produce 50000 message for a particular topic,At subscriber end we
are getting less message for that same topic.
We are able to produce asyn data.but not getting same in consumer side

Two things here:

1) if you are using sendAsync, it is better to listen to the callback to
see if the send is failed. You need also to make sure you wait for all
messages produced before shutting down your producer instance.

2) There is most likely a change that the messages are dropped due to
exceeding the max pending queue. Did you set `blockIfQueueFull` to `true`?

- Sijie


On Thu, Mar 12, 2020 at 6:53 AM Tarik Afwas <[email protected]> wrote:

> Hi Team,
>
> Please help me to resolve me below topics urgently:-
>
>
> 1. Unable to remove duplicate message in broker.below Is my Code and
> config File.
>
> Producer Coding Related:-
> PulsarClient pulsarClient = PulsarClient.builder()
> .serviceUrl( "pulsar://192.168.x.x:6650" )
> .build();
> Producer producer = pulsarClient.newProducer()
> .producerName( "producer-1" )
> .topic( "persistent://public/default/topic-1" )
> .sendTimeout( 0 , TimeUnit.SECONDS)
> .create();
>
> Configuration Side:-
> # Set the default behavior for message deduplication in the broker
> # This can be overridden per-namespace. If enabled, broker will reject
> # messages that were already stored in the topic
> brokerDeduplicationEnabled=true
>
> # Maximum number of producer information that it's going to be
> # persisted for deduplication purposes
> brokerDeduplicationMaxNumberOfProducers=1000
>
> # Number of entries after which a dedup info snapshot is taken.
> # A larger interval will lead to fewer snapshots being taken, though it
> would
> # increase the topic recovery time when the entries published after the
> # snapshot need to be replayed.
> brokerDeduplicationEntriesInterval=1000
>
> # Time of inactivity after which the broker will discard the deduplication
> information
> # relative to a disconnected producer. Default is 6 hours.
> brokerDeduplicationProducerInactivityTimeoutMinutes=1
>
> 2. If we produce 50000 message for a particular topic,At subscriber end we
> are getting less message for that same topic.
> We are able to produce asyn data.but not getting same in consumer side
>
> Producer Side Code:-
> String
> instjson="[{\"iRAngle\":0,\"iYAngle\":0,\"iBAngle\":0,\"ReadTime\":\""+timeStamp+"\",\"MeterNumber\":\""+meterNumber+"\",\"ModemTime\":\""+timeStamp+"\",\"PFr\":0.827,\"PFy\":0.746,\"PFb\":0.793,\"Ir\":25.89,\"Iy\":18.64,\"Ib\":21.27,\"Vrn\":217.84,\"Vyn\":217.52,\"Vbn\":218.75,\"ThreePhasePF\":0.794,\"Frequency\":50.08,\"KVA\":14.26,\"Power_KW\":11.336,\"KVAR\":8.651,\"KWH\":0,\"KVARH_Lag\":0,\"KVARH_Lead\":0,\"KVAH\":0,\"PowerOffCount\":406,\"PowerOffDuration\":125584,\"TamperCount\":3452,\"MDResetCount\":9,\"ProgrammCount\":0,\"MDResetDate\":\"01-02-2020
> 00:00:00\",\"MDKW\":0,\"Date_MDKW\":\"\",\"MDKVA\":0,\"Date_MDKVA\":\"\",\"KWH_Exp\":0,\"KVAH_Exp\":0,\"KWH_Imp\":0,\"KVAH_Imp\":0,\"TransID\":\"8310299\",\"Imei\":\"\"}]";
>
>
>
>
> public class PulsarProducer {
> private PulsarClient client;
> public PulsarProducer() throws PulsarClientException {
> client = PulsarClient.builder()
> .serviceUrl(FilterUnit.pulserURL)
> .build();
> }
>
> public Producer<String> getProducer(String topic) throws
> PulsarClientException {
> return client.newProducer(Schema.STRING)
> .producerName("APP_PRODUCER")
> .topic("persistent://public/default/"+topic)
> .sendTimeout(0, TimeUnit.SECONDS)
> .create();
> }
>
> }
>
> Producer<String> producer = FilterUnit.producer.getProducer("topic");
> JSONArray recs = new JSONArray( instjson );
> for (int i = 0; i < recs.length(); i++) {
> JSONObject obj = recs.getJSONObject(i);
> producer.sendAsync(obj.toString());
> }
> producer.closeAsync();
>
> Consumer Side Code:-
>
> PulsarClient client =
> PulsarClient.builder().serviceUrl(FilterUnit.pulserURL).build();
>
> Consumer<String> consumer = client.newConsumer(Schema.STRING)
> .topic("topic")
> .subscriptionName("INSTANT")
> .subscribe();
>
> while (true) {
> // Wait for a message
> Message msg = consumer.receive();
> try {
> if (msg.getValue() != null && !msg.getValue().toString().isEmpty()) {
> System.out.println("VALUE :-" + msg.getValue() );
> }
> // Acknowledge the message so that it can be deleted by the message broker
> consumer.acknowledge(msg);
> } catch (Exception e) {
> e.printStackTrace();
> consumer.negativeAcknowledge(msg);
> }
> }
>
>
>
>
> Thanks & Regards,
> Tarik Afwas
> Software Engineer
> [ http://www.bcits.co.in/ | BCITS Pvt Ltd. ]
> Mobile : +91 7676811033
> ____________________
> :: DISCLAIMER ::
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please notify the system manager.
> This message contains confidential information and is intended only for the
> individual named. If you are not the named addressee you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately by e-mail if you have received this e-mail by mistake and
> delete this e-mail from your system. If you are not the intended recipient
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited.
>

Reply via email to