-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

KAFKA-4344 was not a bug. The issues was as wrong initialization order
of Kafka Streams by the user.

Please double check your initialization order (and maybe read the old
email thread and JIRA comments -- it might have some relevant
information for you to fix the issue for you).

If the problem is still there, can you please reduce your code to a
minimum example that reproduces the problem?

Thanks!

- -Matthias

On 11/5/16 3:28 PM, srinivas koniki wrote:
> 
> Hi, I'm still seeing the same issue with spring boot. Code is
> below, sorry code is in groovy and not fully baked. Just have
> single processor. It worked well with single partition. But when i
> increased the partitions, started seeing the error as in this
> kafka-4344.
> 
> 
> import com.codahale.metrics.MetricRegistry import
> org.apache.kafka.clients.consumer.ConsumerConfig import
> org.apache.kafka.clients.producer.KafkaProducer import
> org.apache.kafka.clients.producer.ProducerRecord import
> org.apache.kafka.common.serialization.Serdes import
> org.apache.kafka.streams.KafkaStreams import
> org.apache.kafka.streams.StreamsConfig import
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster 
> import org.apache.kafka.streams.processor.AbstractProcessor import
> org.apache.kafka.streams.processor.ProcessorSupplier import
> org.apache.kafka.streams.processor.TopologyBuilder import
> org.aspectj.lang.ProceedingJoinPoint import
> org.aspectj.lang.annotation.AfterReturning import
> org.aspectj.lang.annotation.Around import
> org.aspectj.lang.annotation.Aspect import
> org.aspectj.lang.annotation.Pointcut import
> org.springframework.beans.factory.annotation.Autowired import
> org.springframework.beans.factory.annotation.Value import
> org.springframework.boot.actuate.metrics.CounterService import
> org.springframework.boot.actuate.metrics.GaugeService import
> org.springframework.boot.autoconfigure.SpringBootApplication import
> org.springframework.boot.test.context.SpringBootTest import
> org.springframework.context.Lifecycle import
> org.springframework.context.annotation.Bean import
> org.springframework.context.annotation.Configuration import
> org.springframework.context.annotation.Import import
> org.springframework.context.support.PropertySourcesPlaceholderConfigur
er
>
> 
import org.springframework.stereotype.Component
> import org.springframework.test.context.ContextConfiguration import
> org.springframework.util.StopWatch import spock.lang.Shared import
> spock.lang.Specification
> 
> import java.util.concurrent.Future import
> java.util.stream.IntStream
> 
> /** * Created by srinivas.koniki on 11/5/16. */ 
> @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> RiakService]) @SpringBootTest(webEnvironment =
> SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> extends Specification{
> 
> static String kafkaTopic = 'testTopic'
> 
> @Shared TestConfig testConfigRef
> 
> @Autowired TestConfig testConfig
> 
> @Autowired MetricRegistry metricRegistry
> 
> @Autowired KafkaProducer kafkaProducer
> 
> @Shared static final EmbeddedKafkaCluster CLUSTER = new
> EmbeddedKafkaCluster(1)
> 
> def setupSpec() { println("Heavy init for all the tests...") 
> CLUSTER.start() 
> System.setProperty('broker.url',CLUSTER.bootstrapServers()) 
> System.setProperty('zk.url',CLUSTER.zKConnectString()) 
> System.setProperty('kafka.topic',kafkaTopic) 
> CLUSTER.createTopic(kafkaTopic, 3, 1) }
> 
> def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> 
> def "Test send and receive" (){ expect: testConfig != null 
> metricRegistry != null println ''+metricRegistry.getGauges()
> 
> when: testConfigRef = testConfig testConfig.start() List<Future>
> futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> 
> Future future = kafkaProducer.send(new ProducerRecord<String,
> String>(kafkaTopic, 'test'+i, 'testMesg'+i)) })
> 
> futureList.forEach({ future -> println future.get() }) then: 
> Thread.sleep(20000)
> 
> println ''+metricRegistry.getGauges() println
> ''+metricRegistry.counters 
> metricRegistry.counters.keySet().forEach({key -> println
> key+':'+metricRegistry.counters.get(key).count }) 
> Thread.sleep(2000) }
> 
> @Configuration @SpringBootApplication static class TestConfig
> implements Lifecycle {
> 
> @Value('${broker.url}') String brokerUrl
> 
> Map<String, String> producerConfig(){ def props =
> ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0,
> "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432, 
> "key.serializer":
> "org.apache.kafka.common.serialization.StringSerializer", 
> "value.serializer" :
> "org.apache.kafka.common.serialization.StringSerializer" ] }
> 
> @Bean KafkaProducer<String, String> kafkaProducer() { new
> KafkaProducer<String, String>(producerConfig()) }
> 
> @Bean public static PropertySourcesPlaceholderConfigurer
> properties() { return new PropertySourcesPlaceholderConfigurer() }
> 
> @Value('${zk.url}') String zkUrl
> 
> @Value('${kafka.topic}') String kafkaTopic
> 
> @Autowired RiakService riakService
> 
> @Autowired CounterService counterService
> 
> KafkaStreams streams
> 
> boolean state
> 
> @Override void start() { println 'starting streams' Properties
> props = new Properties(); props.put('group.id',
> "streams-test-processor-group"); 
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "streams-test-processor"); 
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); 
> props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl); 
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass()); 
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass()); 
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
> TopologyBuilder builder = new TopologyBuilder(); 
> builder.addSource("Source", kafkaTopic); def processor = new
> PriceProcessor(riakService, counterService) def procSupplier =
> {processor} as ProcessorSupplier builder.addProcessor("Process",
> procSupplier, "Source");
> 
> streams = new KafkaStreams(builder, props); streams.start() println
> ' Streams started' state = true }
> 
> @Override void stop() { streams.close() state = false }
> 
> @Override boolean isRunning() { return state }
> 
> }
> 
> static class PriceProcessor extends AbstractProcessor<String,
> String> { RiakService riakClient1 CounterService counterService 
> PriceProcessor(RiakService riakClient2, CounterService
> counterService1){ this.riakClient1 = riakClient2 
> this.counterService = counterService1 }
> 
> @Override void process(String key, String value) { 
> riakClient1.save(key, value)
> 
> context().commit() println 'offset-'+context().offset()+',
> partition -'+context().partition() 
> counterService.increment('kafka.partition.'+context().partition()+'.of
fset')
>
> 
}
> 
> 
> }
> 
> @Aspect @Component static class MetricsAspect { final
> CounterService counterService final GaugeService gaugeService
> 
> @Autowired MetricsAspect(GaugeService gaugeService1, CounterService
> counterService1){ println 'aspect init' this.gaugeService =
> gaugeService1 this.counterService = counterService1 }
> 
> @Pointcut("execution(*
> com.bsb.retail.price.kafka.RiakService.save(..))") public void
> methodsToBeProfiled(){}
> 
> @Around("methodsToBeProfiled()") public Object
> profile(ProceedingJoinPoint pjp) throws Throwable { StopWatch sw =
> new StopWatch(getClass().getSimpleName()); try { 
> sw.start(pjp.getSignature().getName()); return pjp.proceed(); }
> finally { sw.stop(); System.out.println(sw.prettyPrint()); 
> gaugeService.submit('riakSave', sw.lastTaskTimeMillis) }
> 
> }
> 
> @AfterReturning(pointcut = "execution(*
> com.bsb.retail.price.RiakService.save(..)) ") void metric(){ 
> println 'increment metrics counter' 
> counterService.increment('message.count') }
> 
> 
> 
> }
> 
> 
> }
> 
> @Component class RiakService { void save(String key, String value)
> { println 'Sending to riak '+Thread.currentThread().name } }
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYILUqAAoJECnhiMLycopPMDQP/RZZTwXm0YOVgnAqvvObGwzq
MCrigCz0+RmFMbGStOVQcRvzEMu1ZAXi6EIq32GUGJtC1L6xwaXamH4IZ9+u/hbi
w8mp9YiQX9RUJEqDYZp0L7P2PfWamMVTz6ALh5xRlBnPIQrsvTaVZmFsn1/B6peM
50/XldUaRNb1RKzpDwjP+K1Y2pfsMxcvvG2VXQUNF6pnjpwETGyOyGzFcl1cX4pc
vg+pkLb7E5WktDw2c18/bImZqji+P/ofuduBLqoAv19/p7gBFRO3UHyjnb3sl/Yp
sHv06kXy13jPJP+6O7jIJo7+0IKOVReoOsJnIYsITi/odQXFA0b7wT42v0Xx2d1+
9YquS5ue9wvN0epngNtlpr+ADzhn0cTa9bDnLUi8RONzgmoZOn39QfeTsEvGbF5l
kR1/1a9BPgK/O11b8rI13obBZxT/XdtPtDmZCBCBfXnSEc7/88Ag8eLKPd6fFpeU
81FXPfPsQklk0UXQck5zH/sm+AZMpAYJIPphRLIP4NNpcBG1XrP+tCWjZuE+lqCZ
DPJ3f41ahP6cj1i0LFleIzIi77k0QHCk6tJISsxo1g5XvVARNvC0EcygJp0utWet
pILJ+o9+l/d5mF19gyEsSpzzoLowvTe4h/fnDkHsSjaJVFrK3XZ/WSI/Drmo5Tjo
+4ur3dG49ntS/uuTwche
=1JgC
-----END PGP SIGNATURE-----

Reply via email to