Hi, I'm still seeing the same issue with spring boot. Code is below, sorry code is in groovy and not fully baked. Just have single processor. It worked well with single partition. But when i increased the partitions, started seeing the error as in this kafka-4344.
import com.codahale.metrics.MetricRegistry import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster import org.apache.kafka.streams.processor.AbstractProcessor import org.apache.kafka.streams.processor.ProcessorSupplier import org.apache.kafka.streams.processor.TopologyBuilder import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.AfterReturning import org.aspectj.lang.annotation.Around import org.aspectj.lang.annotation.Aspect import org.aspectj.lang.annotation.Pointcut import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.boot.actuate.metrics.CounterService import org.springframework.boot.actuate.metrics.GaugeService import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.test.context.SpringBootTest import org.springframework.context.Lifecycle import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import import org.springframework.context.support.PropertySourcesPlaceholderConfigurer import org.springframework.stereotype.Component import org.springframework.test.context.ContextConfiguration import org.springframework.util.StopWatch import spock.lang.Shared import spock.lang.Specification import java.util.concurrent.Future import java.util.stream.IntStream /** * Created by srinivas.koniki on 11/5/16. */ @ContextConfiguration(classes=[TestConfig, MetricsAspect, RiakService]) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec extends Specification{ static String kafkaTopic = 'testTopic' @Shared TestConfig testConfigRef @Autowired TestConfig testConfig @Autowired MetricRegistry metricRegistry @Autowired KafkaProducer kafkaProducer @Shared static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1) def setupSpec() { println("Heavy init for all the tests...") CLUSTER.start() System.setProperty('broker.url',CLUSTER.bootstrapServers()) System.setProperty('zk.url',CLUSTER.zKConnectString()) System.setProperty('kafka.topic',kafkaTopic) CLUSTER.createTopic(kafkaTopic, 3, 1) } def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() } def "Test send and receive" (){ expect: testConfig != null metricRegistry != null println ''+metricRegistry.getGauges() when: testConfigRef = testConfig testConfig.start() List<Future> futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> Future future = kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, 'test'+i, 'testMesg'+i)) }) futureList.forEach({ future -> println future.get() }) then: Thread.sleep(20000) println ''+metricRegistry.getGauges() println ''+metricRegistry.counters metricRegistry.counters.keySet().forEach({key -> println key+':'+metricRegistry.counters.get(key).count }) Thread.sleep(2000) } @Configuration @SpringBootApplication static class TestConfig implements Lifecycle { @Value('${broker.url}') String brokerUrl Map<String, String> producerConfig(){ def props = ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0, "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432, "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", "value.serializer" : "org.apache.kafka.common.serialization.StringSerializer" ] } @Bean KafkaProducer<String, String> kafkaProducer() { new KafkaProducer<String, String>(producerConfig()) } @Bean public static PropertySourcesPlaceholderConfigurer properties() { return new PropertySourcesPlaceholderConfigurer() } @Value('${zk.url}') String zkUrl @Value('${kafka.topic}') String kafkaTopic @Autowired RiakService riakService @Autowired CounterService counterService KafkaStreams streams boolean state @Override void start() { println 'starting streams' Properties props = new Properties(); props.put('group.id', "streams-test-processor-group"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", kafkaTopic); def processor = new PriceProcessor(riakService, counterService) def procSupplier = {processor} as ProcessorSupplier builder.addProcessor("Process", procSupplier, "Source"); streams = new KafkaStreams(builder, props); streams.start() println ' Streams started' state = true } @Override void stop() { streams.close() state = false } @Override boolean isRunning() { return state } } static class PriceProcessor extends AbstractProcessor<String, String> { RiakService riakClient1 CounterService counterService PriceProcessor(RiakService riakClient2, CounterService counterService1){ this.riakClient1 = riakClient2 this.counterService = counterService1 } @Override void process(String key, String value) { riakClient1.save(key, value) context().commit() println 'offset-'+context().offset()+', partition -'+context().partition() counterService.increment('kafka.partition.'+context().partition()+'.offset') } } @Aspect @Component static class MetricsAspect { final CounterService counterService final GaugeService gaugeService @Autowired MetricsAspect(GaugeService gaugeService1, CounterService counterService1){ println 'aspect init' this.gaugeService = gaugeService1 this.counterService = counterService1 } @Pointcut("execution(* com.bsb.retail.price.kafka.RiakService.save(..))") public void methodsToBeProfiled(){} @Around("methodsToBeProfiled()") public Object profile(ProceedingJoinPoint pjp) throws Throwable { StopWatch sw = new StopWatch(getClass().getSimpleName()); try { sw.start(pjp.getSignature().getName()); return pjp.proceed(); } finally { sw.stop(); System.out.println(sw.prettyPrint()); gaugeService.submit('riakSave', sw.lastTaskTimeMillis) } } @AfterReturning(pointcut = "execution(* com.bsb.retail.price.RiakService.save(..)) ") void metric(){ println 'increment metrics counter' counterService.increment('message.count') } } } @Component class RiakService { void save(String key, String value) { println 'Sending to riak '+Thread.currentThread().name } }