I cannot because there are messages which need high priority. Setting poll
interval to 4 second means there might be delay of 4 seconds + regular
processing time, which is not desirable.

Also, will it impact heartbeating?

On Tue, May 29, 2018 at 6:17 PM M. Manna <manme...@gmail.com> wrote:

> Have you tried increase the poll time higher, e.g. 4000 and see if that
> helps matters?
>
> On 29 May 2018 at 13:44, Shantanu Deshmukh <shantanu...@gmail.com> wrote:
>
> > Here is the code which consuming messages
> >
> > >>>>>>>>
> > while(true && startShutdown == false) {
> >     Context context = new Context();
> >     JSONObject notifJSON = new JSONObject();
> >     String notificationMsg = "";
> >     NotificationEvent notifEvent = null;
> >     initializeContext();
> >     try {
> >         consumerConnect();
> >         ConsumerRecords<String, String> records = consumer.poll(100);
> >         if(records.count() == 0) {
> >             //logger.trace("No records in topic: "+this.topic);
> >             continue;
> >         }
> >         for(ConsumerRecord<String, String> record : records) {
> >             try {
> >                 long totalStart = System.currentTimeMillis();
> >                 notificationMsg = record.value();
> >                 JSONParser jsonParser = new JSONParser();
> >                 logger.trace("Kafka-Msg: >>"+notificationMsg);
> >                 if(notificationMsg.equals("")) {
> >                     continue;
> >                 }
> >                 Profiler.start(workerId, "json-parse");
> >                 notifJSON           =
> > (JSONObject)jsonParser.parse(notificationMsg);
> >                 Profiler.end(workerId, "json-parse");
> >                 notifEvent    = new NotificationEvent(notifJSON);
> >                 if( notifEvent.getTransactionID().equals("") == true ) {
> >                     notifEvent.generateTransactionID();
> >                 }
> >                 context.setEventObject(notifEvent);
> >                 updateContext(context);
> >
> >                 //======== Fetch template ==========//
> >                 Profiler.start(workerId, "tpl-fetch");
> >                 long start = System.currentTimeMillis();
> >                 Template template   =
> > notifTplMngr.fetchTemplate(notifEvent);
> >
> > logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
> >                 Profiler.end(workerId, "tpl-fetch");
> >
> >                 //======== Personalise template ==========//
> >                 Profiler.start(workerId, "personalisation");
> >                 start = System.currentTimeMillis();
> >                 String message      =
> > NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);
> >
> > notifEvent.setMaskedMessage(NotificationTemplatePersonalis
> > er.getMaskedContent(template,
> > notifEvent));
> >
> > logger.trace("personalise:"+(System.currentTimeMillis()-start));
> >                 Profiler.end(workerId, "personalisation");
> >
> >                 context.setEventObject(notifEvent);
> >                 updateContext(context);
> >
> >                 //======== Send notification==========//
> >                 Profiler.start(workerId, "notif-dispatch");
> >                 postOffice.sendNotification(message, notifEvent);
> >                 Profiler.end(workerId, "notif-dispatch");
> >
> >                 retryCount = 0;
> >                 logger.debug("Time to complete notification dispatch
> > :"+(System.currentTimeMillis()-totalStart));
> >                 if(startShutdown == true) {
> >                     break;
> >                 }
> >             } catch (Exception ex) {
> >                 if(ex instanceof RetriableException) {
> >                     kafkaLogger.error(ex);
> >                     logger.warn("",ex);
> >                     addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_RETRIABLE_FAILURE);
> >                 } else if(ex instanceof InvalidEventException) {
> >
> >                     JsonLog jsonLog = new JsonLog();
> >                     jsonLog.setDescription("Invalid event message.
> Reason:
> > "+ex.getMessage());
> >                     jsonLog.setOriginalPayload(notificationMsg);
> >                     jsonLog.setEventType("ERROR");
> >                     jsonLog.setCode("InvalidEventException");
> >                     jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
> >                     jsonLog.setSubComponent(notifEvent.getChannelName());
> >                     kafkaLogger.log(jsonLog);
> >                     //kafkaLogger.error(ex);
> >                     addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_PERMANENT_FAILURE);
> >                     logger.warn("Invalid event message. Reason:
> > "+ex.getMessage());
> >
> >                 } else if(ex instanceof EventFailedException) {
> >                     addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_PERMANENT_FAILURE);
> >                     kafkaLogger.error(ex);
> >                     logger.warn("Notification event failed. Reason:
> > "+ex.getMessage());
> >
> >                 } else if(ex instanceof
> > org.json.simple.parser.ParseException) {
> >                     kafkaLogger.error("Exception while parsing
> notification
> > JSON message.");
> >                     logger.warn("Exception while parsing notification
> JSON
> > message.");
> >                 } else {
> >                     kafkaLogger.error(ex);
> >                     addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_PERMANENT_FAILURE);
> >                     logger.warn("",ex);
> >                 }
> >             } finally {
> >                 eventsProcessed++;
> >             }
> >         }
> >     } catch (Exception ex) {
> >         kafkaLogger.error(ex);
> >         addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_PERMANENT_FAILURE);
> >         logger.warn("",ex);
> >     }
> > }
> > <<<<<<<<<<
> >
> > And here are server properties.
> >
> > broker.id=0
> > port=9092
> > delete.topic.enable=true
> > message.max.bytes=1500000
> > listeners=SSL://x.x.x.x:9092
> > advertised.listeners=SSL://x.x.x.x:9092
> > num.network.threads=3
> > num.io.threads=8
> > socket.send.buffer.bytes=102400
> > socket.receive.buffer.bytes=102400
> > socket.request.max.bytes=104857600
> > log.dirs=/lotus/kafka-logs
> > num.partitions=3
> > auto.topic.creation.enable=false
> > num.recovery.threads.per.data.dir=1
> > log.retention.hours=168
> > log.segment.bytes=1073741824
> > log.retention.check.interval.ms=300000
> > ssl.keystore.location=/opt/kafka/certificates/kafka.keystore.jks
> > ssl.keystore.password=xxxx
> > ssl.key.password=xxxx
> > ssl.truststore.location=/opt/kafka/certificates/kafka.truststore.jks
> > ssl.truststore.password=xxxx
> > security.inter.broker.protocol=SSL
> > zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
> > zookeeper.connection.timeout.ms=6000
> >
> > On Tue, May 29, 2018 at 5:59 PM M. Manna <manme...@gmail.com> wrote:
> >
> > > Thanks..
> > >
> > > Where is your consumer code that is consuming messages?
> > >
> > > On 29 May 2018 at 13:18, Shantanu Deshmukh <shantanu...@gmail.com>
> > wrote:
> > >
> > > > No problem, here are consumer properties
> > > > ---------
> > > > auto.commit.interval.ms = 3000
> > > > auto.offset.reset = latest
> > > > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > > > check.crcs = true
> > > > client.id =
> > > > connections.max.idle.ms = 540000
> > > > enable.auto.commit = true
> > > > exclude.internal.topics = true
> > > > fetch.max.bytes = 52428800
> > > > fetch.max.wait.ms = 500
> > > > fetch.min.bytes = 1
> > > > group.id = otp-notifications-consumer
> > > > heartbeat.interval.ms = 3000
> > > > interceptor.classes = null
> > > > key.deserializer = class
> > > > org.apache.kafka.common.serialization.StringDeserializer
> > > > max.partition.fetch.bytes = 1048576
> > > > max.poll.interval.ms = 300000
> > > > max.poll.records = 5
> > > > metadata.max.age.ms = 300000
> > > > metric.reporters = []
> > > > metrics.num.samples = 2
> > > > metrics.sample.window.ms = 30000
> > > > partition.assignment.strategy = [class
> > > > org.apache.kafka.clients.consumer.RangeAssignor]
> > > > receive.buffer.bytes = 65536
> > > > reconnect.backoff.ms = 50
> > > > request.timeout.ms = 305000
> > > > retry.backoff.ms = 100
> > > > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > > sasl.kerberos.min.time.before.relogin = 60000
> > > > sasl.kerberos.service.name = null
> > > > sasl.kerberos.ticket.renew.jitter = 0.05
> > > > sasl.kerberos.ticket.renew.window.factor = 0.8
> > > > sasl.mechanism = GSSAPI
> > > > security.protocol = SSL
> > > > send.buffer.bytes = 131072
> > > > session.timeout.ms = 300000
> > > > ssl.cipher.suites = null
> > > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > > ssl.endpoint.identification.algorithm = null
> > > > ssl.key.password = null
> > > > ssl.keymanager.algorithm = SunX509
> > > > ssl.keystore.location = null
> > > > ssl.keystore.password = null
> > > > ssl.keystore.type = JKS
> > > > ssl.protocol = TLS
> > > > ssl.provider = null
> > > > ssl.secure.random.implementation = null
> > > > ssl.trustmanager.algorithm = PKIX
> > > > ssl.truststore.location = ****
> > > > ssl.truststore.password = [hidden]
> > > > ssl.truststore.type = JKS
> > > > value.deserializer = class
> > > > org.apache.kafka.common.serialization.StringDeserializer
> > > > ------------
> > > >
> > > > On Tue, May 29, 2018 at 5:36 PM M. Manna <manme...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > It's not possible to answer questions based on text. You need to
> > share
> > > > your
> > > > > consumer.properties, and server.properties file, and also, what
> > exactly
> > > > you
> > > > > have changed from default configuration.
> > > > >
> > > > >
> > > > >
> > > > > On 29 May 2018 at 12:51, Shantanu Deshmukh <shantanu...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each
> > with
> > > 10
> > > > > > partitions. I have an application which consumes from all these
> > > topics
> > > > by
> > > > > > creating multiple consumer processes. All of these consumers are
> > > under
> > > > a
> > > > > > same consumer group. I am noticing that every time we restart
> this
> > > > > > application. It takes almost 5 minutes for consumers to start
> > > > consuming.
> > > > > > What might be going wrong?
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to