-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 KAFKA-4344 was not a bug. The issues was as wrong initialization order of Kafka Streams by the user.
Please double check your initialization order (and maybe read the old email thread and JIRA comments -- it might have some relevant information for you to fix the issue for you). If the problem is still there, can you please reduce your code to a minimum example that reproduces the problem? Thanks! - -Matthias On 11/5/16 3:28 PM, srinivas koniki wrote: > > Hi, I'm still seeing the same issue with spring boot. Code is > below, sorry code is in groovy and not fully baked. Just have > single processor. It worked well with single partition. But when i > increased the partitions, started seeing the error as in this > kafka-4344. > > > import com.codahale.metrics.MetricRegistry import > org.apache.kafka.clients.consumer.ConsumerConfig import > org.apache.kafka.clients.producer.KafkaProducer import > org.apache.kafka.clients.producer.ProducerRecord import > org.apache.kafka.common.serialization.Serdes import > org.apache.kafka.streams.KafkaStreams import > org.apache.kafka.streams.StreamsConfig import > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster > import org.apache.kafka.streams.processor.AbstractProcessor import > org.apache.kafka.streams.processor.ProcessorSupplier import > org.apache.kafka.streams.processor.TopologyBuilder import > org.aspectj.lang.ProceedingJoinPoint import > org.aspectj.lang.annotation.AfterReturning import > org.aspectj.lang.annotation.Around import > org.aspectj.lang.annotation.Aspect import > org.aspectj.lang.annotation.Pointcut import > org.springframework.beans.factory.annotation.Autowired import > org.springframework.beans.factory.annotation.Value import > org.springframework.boot.actuate.metrics.CounterService import > org.springframework.boot.actuate.metrics.GaugeService import > org.springframework.boot.autoconfigure.SpringBootApplication import > org.springframework.boot.test.context.SpringBootTest import > org.springframework.context.Lifecycle import > org.springframework.context.annotation.Bean import > org.springframework.context.annotation.Configuration import > org.springframework.context.annotation.Import import > org.springframework.context.support.PropertySourcesPlaceholderConfigur er > > import org.springframework.stereotype.Component > import org.springframework.test.context.ContextConfiguration import > org.springframework.util.StopWatch import spock.lang.Shared import > spock.lang.Specification > > import java.util.concurrent.Future import > java.util.stream.IntStream > > /** * Created by srinivas.koniki on 11/5/16. */ > @ContextConfiguration(classes=[TestConfig, MetricsAspect, > RiakService]) @SpringBootTest(webEnvironment = > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec > extends Specification{ > > static String kafkaTopic = 'testTopic' > > @Shared TestConfig testConfigRef > > @Autowired TestConfig testConfig > > @Autowired MetricRegistry metricRegistry > > @Autowired KafkaProducer kafkaProducer > > @Shared static final EmbeddedKafkaCluster CLUSTER = new > EmbeddedKafkaCluster(1) > > def setupSpec() { println("Heavy init for all the tests...") > CLUSTER.start() > System.setProperty('broker.url',CLUSTER.bootstrapServers()) > System.setProperty('zk.url',CLUSTER.zKConnectString()) > System.setProperty('kafka.topic',kafkaTopic) > CLUSTER.createTopic(kafkaTopic, 3, 1) } > > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() } > > def "Test send and receive" (){ expect: testConfig != null > metricRegistry != null println ''+metricRegistry.getGauges() > > when: testConfigRef = testConfig testConfig.start() List<Future> > futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> > Future future = kafkaProducer.send(new ProducerRecord<String, > String>(kafkaTopic, 'test'+i, 'testMesg'+i)) }) > > futureList.forEach({ future -> println future.get() }) then: > Thread.sleep(20000) > > println ''+metricRegistry.getGauges() println > ''+metricRegistry.counters > metricRegistry.counters.keySet().forEach({key -> println > key+':'+metricRegistry.counters.get(key).count }) > Thread.sleep(2000) } > > @Configuration @SpringBootApplication static class TestConfig > implements Lifecycle { > > @Value('${broker.url}') String brokerUrl > > Map<String, String> producerConfig(){ def props = > ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0, > "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432, > "key.serializer": > "org.apache.kafka.common.serialization.StringSerializer", > "value.serializer" : > "org.apache.kafka.common.serialization.StringSerializer" ] } > > @Bean KafkaProducer<String, String> kafkaProducer() { new > KafkaProducer<String, String>(producerConfig()) } > > @Bean public static PropertySourcesPlaceholderConfigurer > properties() { return new PropertySourcesPlaceholderConfigurer() } > > @Value('${zk.url}') String zkUrl > > @Value('${kafka.topic}') String kafkaTopic > > @Autowired RiakService riakService > > @Autowired CounterService counterService > > KafkaStreams streams > > boolean state > > @Override void start() { println 'starting streams' Properties > props = new Properties(); props.put('group.id', > "streams-test-processor-group"); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "streams-test-processor"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > TopologyBuilder builder = new TopologyBuilder(); > builder.addSource("Source", kafkaTopic); def processor = new > PriceProcessor(riakService, counterService) def procSupplier = > {processor} as ProcessorSupplier builder.addProcessor("Process", > procSupplier, "Source"); > > streams = new KafkaStreams(builder, props); streams.start() println > ' Streams started' state = true } > > @Override void stop() { streams.close() state = false } > > @Override boolean isRunning() { return state } > > } > > static class PriceProcessor extends AbstractProcessor<String, > String> { RiakService riakClient1 CounterService counterService > PriceProcessor(RiakService riakClient2, CounterService > counterService1){ this.riakClient1 = riakClient2 > this.counterService = counterService1 } > > @Override void process(String key, String value) { > riakClient1.save(key, value) > > context().commit() println 'offset-'+context().offset()+', > partition -'+context().partition() > counterService.increment('kafka.partition.'+context().partition()+'.of fset') > > } > > > } > > @Aspect @Component static class MetricsAspect { final > CounterService counterService final GaugeService gaugeService > > @Autowired MetricsAspect(GaugeService gaugeService1, CounterService > counterService1){ println 'aspect init' this.gaugeService = > gaugeService1 this.counterService = counterService1 } > > @Pointcut("execution(* > com.bsb.retail.price.kafka.RiakService.save(..))") public void > methodsToBeProfiled(){} > > @Around("methodsToBeProfiled()") public Object > profile(ProceedingJoinPoint pjp) throws Throwable { StopWatch sw = > new StopWatch(getClass().getSimpleName()); try { > sw.start(pjp.getSignature().getName()); return pjp.proceed(); } > finally { sw.stop(); System.out.println(sw.prettyPrint()); > gaugeService.submit('riakSave', sw.lastTaskTimeMillis) } > > } > > @AfterReturning(pointcut = "execution(* > com.bsb.retail.price.RiakService.save(..)) ") void metric(){ > println 'increment metrics counter' > counterService.increment('message.count') } > > > > } > > > } > > @Component class RiakService { void save(String key, String value) > { println 'Sending to riak '+Thread.currentThread().name } } > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYILUqAAoJECnhiMLycopPMDQP/RZZTwXm0YOVgnAqvvObGwzq MCrigCz0+RmFMbGStOVQcRvzEMu1ZAXi6EIq32GUGJtC1L6xwaXamH4IZ9+u/hbi w8mp9YiQX9RUJEqDYZp0L7P2PfWamMVTz6ALh5xRlBnPIQrsvTaVZmFsn1/B6peM 50/XldUaRNb1RKzpDwjP+K1Y2pfsMxcvvG2VXQUNF6pnjpwETGyOyGzFcl1cX4pc vg+pkLb7E5WktDw2c18/bImZqji+P/ofuduBLqoAv19/p7gBFRO3UHyjnb3sl/Yp sHv06kXy13jPJP+6O7jIJo7+0IKOVReoOsJnIYsITi/odQXFA0b7wT42v0Xx2d1+ 9YquS5ue9wvN0epngNtlpr+ADzhn0cTa9bDnLUi8RONzgmoZOn39QfeTsEvGbF5l kR1/1a9BPgK/O11b8rI13obBZxT/XdtPtDmZCBCBfXnSEc7/88Ag8eLKPd6fFpeU 81FXPfPsQklk0UXQck5zH/sm+AZMpAYJIPphRLIP4NNpcBG1XrP+tCWjZuE+lqCZ DPJ3f41ahP6cj1i0LFleIzIi77k0QHCk6tJISsxo1g5XvVARNvC0EcygJp0utWet pILJ+o9+l/d5mF19gyEsSpzzoLowvTe4h/fnDkHsSjaJVFrK3XZ/WSI/Drmo5Tjo +4ur3dG49ntS/uuTwche =1JgC -----END PGP SIGNATURE-----