Thanks Matthias. Can you also please confirm the compatible versions of the client dependencies . Our broker version is 10.2.1 and when i updgrade the client library to 1.1.0, i am getting a issue with tests while starting the embedded cluster . Test dependencies are (kafka-stream.version is 1.1.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka-stream.version}</version> <classifier>test</classifier> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka-stream.version}</version> <classifier>test</classifier> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka-stream.version}</version> <classifier>test</classifier> <scope>test</scope> </dependency> Test error ======= java.lang.AbstractMethodError: kafka.zk.EmbeddedZookeeper.kafka$utils$Logging$_setter_$loggerName_$eq(Ljava/lang/String;)V at kafka.utils.Logging$class.$init$(Logging.scala:23) at kafka.zk.EmbeddedZookeeper.<init>(EmbeddedZookeeper.scala:37) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:87) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.before(EmbeddedKafkaCluster.java:153) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) at com.github.tomakehurst.wiremock.junit.WireMockClassRule$1.evaluate(WireMockClassRule.java:70) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) On Sat, Apr 21, 2018 at 3:06 AM, Matthias J. Sax <matth...@confluent.io> wrote: > You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499 > > Was fixed in 1.1 release. > > Thus, you can just ignore the checkpoint file. There should be no issue > with running on Kubernetes. > > Also, if there is no store (independent of disk based or in-memory) > there will be no changelog topic. > > > -Matthias > > On 4/21/18 8:34 AM, pradeep s wrote: > > Hi, > > I am using kafka streams app connecting to confluent kafka > cluster(10.2.1). > > Application is reading messages from a topic, performing a tranformation > > and pushing to output topic . There is no count or aggregation performed > . > > Have following clarifications regarding state directory. > > > > *1)* Will there be any data written in state directory? > > When i verified the state directory , it was showing > > 0 > > 0 > > > > *2)* Application is running in kubernetes without any external volumes . > > Will state directory cause any processing issue during kubernetes pod > > restarts? > > > > *3)* Will the app creates a changelog topic since there is no in memory > > store used in the app? > > > > > > Code Snippet > > =========== > > > > Stream Config > > ============= > > > > properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > streamEnvironment.getKafkaBootstrapServers()); > > properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > > NUMBER_OF_STREAM_THREADS); > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > COMMIT_INTERVAL_MS); > > > > > > properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY); > > properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_ > EXCEPTION_HANDLER_CLASS_CONFIG, > > ItemDeserializationExceptionHandler.class); > > > > > > > > Stream builder > > ============= > > > > private void processStream(StreamsBuilder builder) { > > KStream<byte[], byte[]> input = builder.stream(inputTopic, > > Consumed.with(byteArraySerde, byteArraySerde)) > > .peek((key, value) -> > > metricsClient.writeMetric( > > > > CountMetric.generate(METRIC_OFFER_INPUT, 1))); > > > > KStream<byte[], DeserializationResultWrapper> > > deserializationResultStream = input > > .mapValues(this::deserializeOffer); > > > > quarantineNonDeSerializableOffers(deserializationResultStream); > > > > KStream<byte[], List<TransformerResult>> trans = > > transformOffers(deserializationResultStream); > > > > produceToQuarantineTopic(trans); > > > > produceToOutputTopic(trans); > > > > } > > > > private void produceToOutputTopic(KStream<byte[], > > List<TransformerResult>> trans) { > > trans.filter((key, value) -> value != null > > && !value.isEmpty()) > > .peek((key, value) -> > > metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT, > > 1))) > > .flatMapValues(transformerResults -> > transformerResults.stream() > > > > .map(TransformerResult::getItem) > > > > .filter(Objects::nonNull) > > > > .collect(Collectors.toCollection(ArrayList::new))) > > .to(outputTopic, Produced.with(byteArraySerde, > itemEnvelopeSerde)); > > } > > > > private void produceToQuarantineTopic(KStream<byte[], > > List<TransformerResult>> trans) { > > trans.filter((key, value) -> value == null || value.isEmpty() > > || > > value.stream().anyMatch(TransformerResult::hasErrors)) > > .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU)) > > .to(quarantineTopic, Produced.with(byteArraySerde, > > quarantineItemEnvelopeSerde)); > > } > > > > Thanks > > Pradeep > > > >