Kafka Streams itself is backward compatible to 0.10.2.1 brokers. However, the embedded cluster you are using is not part of public API and the 0.10.2.1 embedded cluster might have a different API than the 1.1 embedded cluster. Thus, you would need to rewrite your tests.
-Matthias On 4/21/18 10:30 PM, pradeep s wrote: > Thanks Matthias. Can you also please confirm the compatible versions of the > client dependencies . Our broker version is 10.2.1 and when i updgrade the > client library to 1.1.0, i am getting a issue with tests while starting the > embedded cluster . > Test dependencies are (kafka-stream.version is 1.1.0) > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-streams</artifactId> > <version>${kafka-stream.version}</version> > <classifier>test</classifier> > <scope>test</scope> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>${kafka-stream.version}</version> > <classifier>test</classifier> > <scope>test</scope> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.11</artifactId> > <version>${kafka-stream.version}</version> > <classifier>test</classifier> > <scope>test</scope> > </dependency> > > > > Test error > ======= > java.lang.AbstractMethodError: > kafka.zk.EmbeddedZookeeper.kafka$utils$Logging$_setter_$loggerName_$eq(Ljava/lang/String;)V > > at kafka.utils.Logging$class.$init$(Logging.scala:23) > at kafka.zk.EmbeddedZookeeper.<init>(EmbeddedZookeeper.scala:37) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:87) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.before(EmbeddedKafkaCluster.java:153) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) > at > com.github.tomakehurst.wiremock.junit.WireMockClassRule$1.evaluate(WireMockClassRule.java:70) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > > On Sat, Apr 21, 2018 at 3:06 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499 >> >> Was fixed in 1.1 release. >> >> Thus, you can just ignore the checkpoint file. There should be no issue >> with running on Kubernetes. >> >> Also, if there is no store (independent of disk based or in-memory) >> there will be no changelog topic. >> >> >> -Matthias >> >> On 4/21/18 8:34 AM, pradeep s wrote: >>> Hi, >>> I am using kafka streams app connecting to confluent kafka >> cluster(10.2.1). >>> Application is reading messages from a topic, performing a tranformation >>> and pushing to output topic . There is no count or aggregation performed >> . >>> Have following clarifications regarding state directory. >>> >>> *1)* Will there be any data written in state directory? >>> When i verified the state directory , it was showing >>> 0 >>> 0 >>> >>> *2)* Application is running in kubernetes without any external volumes . >>> Will state directory cause any processing issue during kubernetes pod >>> restarts? >>> >>> *3)* Will the app creates a changelog topic since there is no in memory >>> store used in the app? >>> >>> >>> Code Snippet >>> =========== >>> >>> Stream Config >>> ============= >>> >>> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> streamEnvironment.getKafkaBootstrapServers()); >>> properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, >>> NUMBER_OF_STREAM_THREADS); >>> properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >> COMMIT_INTERVAL_MS); >>> >>> >>> properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY); >>> properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_ >> EXCEPTION_HANDLER_CLASS_CONFIG, >>> ItemDeserializationExceptionHandler.class); >>> >>> >>> >>> Stream builder >>> ============= >>> >>> private void processStream(StreamsBuilder builder) { >>> KStream<byte[], byte[]> input = builder.stream(inputTopic, >>> Consumed.with(byteArraySerde, byteArraySerde)) >>> .peek((key, value) -> >>> metricsClient.writeMetric( >>> >>> CountMetric.generate(METRIC_OFFER_INPUT, 1))); >>> >>> KStream<byte[], DeserializationResultWrapper> >>> deserializationResultStream = input >>> .mapValues(this::deserializeOffer); >>> >>> quarantineNonDeSerializableOffers(deserializationResultStream); >>> >>> KStream<byte[], List<TransformerResult>> trans = >>> transformOffers(deserializationResultStream); >>> >>> produceToQuarantineTopic(trans); >>> >>> produceToOutputTopic(trans); >>> >>> } >>> >>> private void produceToOutputTopic(KStream<byte[], >>> List<TransformerResult>> trans) { >>> trans.filter((key, value) -> value != null >>> && !value.isEmpty()) >>> .peek((key, value) -> >>> metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT, >>> 1))) >>> .flatMapValues(transformerResults -> >> transformerResults.stream() >>> >>> .map(TransformerResult::getItem) >>> >>> .filter(Objects::nonNull) >>> >>> .collect(Collectors.toCollection(ArrayList::new))) >>> .to(outputTopic, Produced.with(byteArraySerde, >> itemEnvelopeSerde)); >>> } >>> >>> private void produceToQuarantineTopic(KStream<byte[], >>> List<TransformerResult>> trans) { >>> trans.filter((key, value) -> value == null || value.isEmpty() >>> || >>> value.stream().anyMatch(TransformerResult::hasErrors)) >>> .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU)) >>> .to(quarantineTopic, Produced.with(byteArraySerde, >>> quarantineItemEnvelopeSerde)); >>> } >>> >>> Thanks >>> Pradeep >>> >> >> >
signature.asc
Description: OpenPGP digital signature