Hi,
I'm still seeing the same issue with spring boot. Code is below, sorry code is 
in groovy and not fully baked. Just have single processor. It worked well with 
single partition. But when i increased the partitions, started seeing the error 
as in this kafka-4344.


import com.codahale.metrics.MetricRegistry
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
import org.apache.kafka.streams.processor.AbstractProcessor
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.apache.kafka.streams.processor.TopologyBuilder
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.AfterReturning
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.annotation.Pointcut
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.actuate.metrics.CounterService
import org.springframework.boot.actuate.metrics.GaugeService
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.Lifecycle
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer
import org.springframework.stereotype.Component
import org.springframework.test.context.ContextConfiguration
import org.springframework.util.StopWatch
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.Future
import java.util.stream.IntStream

/**
 * Created by srinivas.koniki on 11/5/16.
 */
@ContextConfiguration(classes=[TestConfig, MetricsAspect, RiakService])
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class MetricsSpec extends Specification{

    static String kafkaTopic = 'testTopic'

    @Shared
    TestConfig testConfigRef

    @Autowired
    TestConfig testConfig

    @Autowired
    MetricRegistry metricRegistry

    @Autowired
    KafkaProducer kafkaProducer

    @Shared
    static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1)

    def setupSpec() {
        println("Heavy init for all the tests...")
        CLUSTER.start()
        System.setProperty('broker.url',CLUSTER.bootstrapServers())
        System.setProperty('zk.url',CLUSTER.zKConnectString())
        System.setProperty('kafka.topic',kafkaTopic)
        CLUSTER.createTopic(kafkaTopic, 3, 1)
    }

    def cleanupSpec() {
        testConfigRef.stop()
        CLUSTER.stop()
    }

    def "Test send and receive" (){
        expect:
        testConfig != null
        metricRegistry != null
        println ''+metricRegistry.getGauges()

        when:
        testConfigRef = testConfig
        testConfig.start()
        List<Future> futureList = new ArrayList<>()
        IntStream.range(1,4).forEach({ i ->
            Future future = kafkaProducer.send(new ProducerRecord<String, 
String>(kafkaTopic, 'test'+i, 'testMesg'+i))
        })

        futureList.forEach({ future ->
           println future.get()
        })
        then:
        Thread.sleep(20000)

        println ''+metricRegistry.getGauges()
        println ''+metricRegistry.counters
        metricRegistry.counters.keySet().forEach({key ->
            println key+':'+metricRegistry.counters.get(key).count
        })
        Thread.sleep(2000)
    }

    @Configuration
    @SpringBootApplication
    static class TestConfig implements Lifecycle {

        @Value('${broker.url}')
        String brokerUrl

        Map<String, String> producerConfig(){
            def props = ["bootstrap.servers" : brokerUrl, "acks" : "all", 
"retries": 0, "batch.size": 16384, "linger.ms": 1,
                         "buffer.memory" : 33554432,
                         "key.serializer": 
"org.apache.kafka.common.serialization.StringSerializer",
                         "value.serializer" : 
"org.apache.kafka.common.serialization.StringSerializer"
            ]
        }

        @Bean
        KafkaProducer<String, String> kafkaProducer() {
            new KafkaProducer<String, String>(producerConfig())
        }

        @Bean
        public static PropertySourcesPlaceholderConfigurer properties() {
            return new PropertySourcesPlaceholderConfigurer()
        }

        @Value('${zk.url}')
        String zkUrl

        @Value('${kafka.topic}')
        String kafkaTopic

        @Autowired
        RiakService riakService

        @Autowired
        CounterService counterService

        KafkaStreams streams

        boolean state

        @Override
        void start() {
            println 'starting streams'
            Properties props = new Properties();
            props.put('group.id', "streams-test-processor-group");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-test-processor");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
            props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl);
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            TopologyBuilder builder = new TopologyBuilder();
            builder.addSource("Source", kafkaTopic);
            def processor = new PriceProcessor(riakService, counterService)
            def procSupplier = {processor} as ProcessorSupplier
            builder.addProcessor("Process", procSupplier, "Source");

            streams = new KafkaStreams(builder, props);
            streams.start()
            println ' Streams started'
            state = true
        }

        @Override
        void stop() {
            streams.close()
            state = false
        }

        @Override
        boolean isRunning() {
            return state
        }

    }

    static class PriceProcessor extends AbstractProcessor<String, String> {
        RiakService riakClient1
        CounterService counterService
        PriceProcessor(RiakService riakClient2, CounterService counterService1){
            this.riakClient1 = riakClient2
            this.counterService = counterService1
        }

        @Override
        void process(String key, String value) {
            riakClient1.save(key, value)

            context().commit()
            println 'offset-'+context().offset()+', partition 
-'+context().partition()
            
counterService.increment('kafka.partition.'+context().partition()+'.offset')
        }


    }

    @Aspect
    @Component
    static class MetricsAspect {
        final CounterService counterService
        final GaugeService gaugeService

        @Autowired
        MetricsAspect(GaugeService gaugeService1, CounterService 
counterService1){
            println 'aspect init'
            this.gaugeService = gaugeService1
            this.counterService = counterService1
        }

        @Pointcut("execution(* 
com.bsb.retail.price.kafka.RiakService.save(..))")
        public void methodsToBeProfiled(){}

        @Around("methodsToBeProfiled()")
        public Object profile(ProceedingJoinPoint pjp) throws Throwable {
            StopWatch sw = new StopWatch(getClass().getSimpleName());
            try {
                sw.start(pjp.getSignature().getName());
                return pjp.proceed();
            } finally {
                sw.stop();
                System.out.println(sw.prettyPrint());
                gaugeService.submit('riakSave', sw.lastTaskTimeMillis)
            }

        }

        @AfterReturning(pointcut = "execution(* 
com.bsb.retail.price.RiakService.save(..)) ")
        void metric(){
            println 'increment metrics counter'
            counterService.increment('message.count')
        }



    }


}

@Component
class RiakService {
    void save(String key, String value) {
        println 'Sending to riak '+Thread.currentThread().name
    }
}

Reply via email to