Dear Kafka Users,
I have updated the question in stackoverlflow . Please let me know about any possible solution. https://stackoverflow.com/questions/46233138/kafka-streams-application-unable-to-horizontally-scale-and-the-application-on-ot On Fri, Sep 15, 2017 at 2:57 PM, dev loper <spark...@gmail.com> wrote: > Hi Damian, > > I do have the logs for the other application. But its kind of huge since > it is continuously processing . Do you want me to grep anything specific > and share it with you ? > > Thanks > Dev > > On Fri, Sep 15, 2017 at 2:31 PM, Damian Guy <damian....@gmail.com> wrote: > >> Hi, >> >> Do you have the logs for the other instance? >> >> Thanks, >> Damian >> >> On Fri, 15 Sep 2017 at 07:19 dev loper <spark...@gmail.com> wrote: >> >> > Dear Kafka Users, >> > >> > I am fairly new to Kafka Streams . I have deployed two instances of >> Kafka >> > 0.11 brokers on AWS M3.Xlarge insatnces. I have created a topic with 36 >> > partitions .and speperate application writes to this topic and it >> produces >> > records at the rate of 10000 messages per second. I have threes >> instances >> > of AWS M4.xlarge instance where my Kafka streams application is >> running >> > which consumes these messages produced by the other application. The >> > application starts up fine working fine and its processing messages on >> the >> > first instance, but when I start the same application on other >> instances >> > it is not starting even though the process is alive it is not processing >> > messages.Also I could see the other instances takes a long time to >> start . >> > >> > Apart from first instance, other instances I could see the consumer >> > getting added and removed repeatedly and I couldn't see any message >> > processing at all . I have attached the detailed logs where this >> behavior >> > is observed. >> > >> > Consumer is getting started with below log in these instances and >> getting >> > stopped with below log (* detailed logs attached *) >> > >> > INFO | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223) - >> > ConsumerConfig values: >> > auto.commit.interval.ms = 5000 >> > auto.offset.reset = latest >> > bootstrap.servers = [l-mykafkainstancekafka5101:9092, >> > l-mykafkainstancekafka5102:9092] >> > check.crcs = true >> > client.id = >> > connections.max.idle.ms = 540000 >> > enable.auto.commit = false >> > exclude.internal.topics = true >> > fetch.max.bytes = 52428800 >> > fetch.max.wait.ms = 500 >> > fetch.min.bytes = 1 >> > group.id = myKafka-kafkareplica101Sept08 >> > heartbeat.interval.ms = 3000 >> > interceptor.classes = null >> > internal.leave.group.on.close = true >> > isolation.level = read_uncommitted >> > key.deserializer = class mx.july.jmx.proximity.kafka.KafkaKryoCodec >> > max.partition.fetch.bytes = 1048576 >> > max.poll.interval.ms = 300000 >> > max.poll.records = 500 >> > metadata.max.age.ms = 300000 >> > metric.reporters = [] >> > metrics.num.samples = 2 >> > metrics.recording.level = INFO >> > metrics.sample.window.ms = 30000 >> > partition.assignment.strategy = [class >> > org.apache.kafka.clients.consumer.RangeAssignor] >> > receive.buffer.bytes = 65536 >> > reconnect.backoff.max.ms = 1000 >> > reconnect.backoff.ms = 50 >> > request.timeout.ms = 305000 >> > retry.backoff.ms = 100 >> > sasl.jaas.config = null >> > sasl.kerberos.kinit.cmd = /usr/bin/kinit >> > sasl.kerberos.min.time.before.relogin = 60000 >> > sasl.kerberos.service.name = null >> > sasl.kerberos.ticket.renew.jitter = 0.05 >> > sasl.kerberos.ticket.renew.window.factor = 0.8 >> > sasl.mechanism = GSSAPI >> > security.protocol = PLAINTEXT >> > send.buffer.bytes = 131072 >> > session.timeout.ms = 10000 >> > ssl.cipher.suites = null >> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> > ssl.endpoint.identification.algorithm = null >> > ssl.key.password = null >> > ssl.keymanager.algorithm = SunX509 >> > ssl.keystore.location = null >> > ssl.keystore.password = null >> > ssl.keystore.type = JKS >> > ssl.protocol = TLS >> > ssl.provider = null >> > ssl.secure.random.implementation = null >> > ssl.trustmanager.algorithm = PKIX >> > ssl.truststore.location = null >> > ssl.truststore.password = null >> > ssl.truststore.type = JKS >> > value.deserializer = class my.dev.MessageUpdateCodec >> > >> > >> > DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) - >> The >> > Kafka consumer has closed. and the whole process repeats. >> > >> > >> > >> > Below you can find my startup code for kafkastreams and the parameters >> > which I have configured for starting the kafkastreams application . >> > >> > private static Properties settings = new Properties(); >> > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, >> > "mykafkastreamsapplication"); >> > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); >> > settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"1 >> 0000"); >> > settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); >> > >> > settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Inte >> ger.MAX_VALUE); >> > settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000"); >> > >> > settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"60000"); >> > >> > KStreamBuilder builder = new KStreamBuilder(); >> > KafkaStreams streams = new KafkaStreams(builder, settings); >> > builder.addSource(..... >> > .addProcessor ............. >> > .addProcessor ........ >> > >> > >> > .addStateStore(...................).persistent().build(),"myprocessor") >> > .addSink .............. >> > . addSink .............. >> > streams.start(); >> > >> > and I am using a Simple processor to process my logic .. >> > >> > public class InfoProcessor extends AbstractProcessor<Key, Update> { >> > private static Logger logger = Logger.getLogger(InfoProcessor.class); >> > private ProcessorContext context; >> > private KeyValueStore<Key, Info> infoStore; >> > >> > @Override >> > @SuppressWarnings("unchecked") >> > public void init(ProcessorContext context) { >> > this.context = context; >> > this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000); >> > infoStore = (KeyValueStore<Key, Info>) >> > context.getStateStore("InfoStore"); >> > } >> > >> > @Override >> > public void process(Key key, Update update) { >> > try { >> > if (key != null && update != null) { >> > Info info = infoStore.get(key); >> > // merge logic >> > infoStore.put(key, info); >> > } >> > >> > } catch (Exception e) { >> > logger.error(e.getMessage(), e); >> > } finally { >> > } >> > context.commit(); >> > } >> > >> > @Override >> > public void punctuate(long timestamp) { >> > try { >> > KeyValueIterator<Key, Info> iter = this.infoStore.all(); >> > while (iter.hasNext()) { >> > // processing logic >> > >> > } >> > iter.close(); >> > context.commit(); >> > } catch (Exception e) { >> > logger.error(e.getMessage(), e); >> > } >> > } >> > >> > >> > >> > >> > >> > >