Kafka Streams itself is backward compatible to 0.10.2.1 brokers.

However, the embedded cluster you are using is not part of public API
and the 0.10.2.1 embedded cluster might have a different API than the
1.1 embedded cluster. Thus, you would need to rewrite your tests.

-Matthias

On 4/21/18 10:30 PM, pradeep s wrote:
> Thanks Matthias. Can you also please confirm the compatible versions of the
> client dependencies . Our broker version is 10.2.1 and when i updgrade the
> client library to 1.1.0, i am getting a issue with tests while starting the
> embedded cluster .
> Test dependencies are (kafka-stream.version is 1.1.0)
> 
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-streams</artifactId>
>     <version>${kafka-stream.version}</version>
>     <classifier>test</classifier>
>     <scope>test</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>${kafka-stream.version}</version>
>     <classifier>test</classifier>
>     <scope>test</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka_2.11</artifactId>
>     <version>${kafka-stream.version}</version>
>     <classifier>test</classifier>
>     <scope>test</scope>
> </dependency>
> 
> 
> 
> Test error
> =======
> java.lang.AbstractMethodError:
> kafka.zk.EmbeddedZookeeper.kafka$utils$Logging$_setter_$loggerName_$eq(Ljava/lang/String;)V
> 
> at kafka.utils.Logging$class.$init$(Logging.scala:23)
> at kafka.zk.EmbeddedZookeeper.<init>(EmbeddedZookeeper.scala:37)
> at
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:87)
> at
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.before(EmbeddedKafkaCluster.java:153)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
> at
> com.github.tomakehurst.wiremock.junit.WireMockClassRule$1.evaluate(WireMockClassRule.java:70)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> 
> On Sat, Apr 21, 2018 at 3:06 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499
>>
>> Was fixed in 1.1 release.
>>
>> Thus, you can just ignore the checkpoint file. There should be no issue
>> with running on Kubernetes.
>>
>> Also, if there is no store (independent of disk based or in-memory)
>> there will be no changelog topic.
>>
>>
>> -Matthias
>>
>> On 4/21/18 8:34 AM, pradeep s wrote:
>>> Hi,
>>> I am using kafka streams app connecting to confluent kafka
>> cluster(10.2.1).
>>> Application is reading messages from a topic, performing a tranformation
>>> and pushing to output topic . There is no count or aggregation performed
>> .
>>> Have following clarifications regarding state directory.
>>>
>>> *1)* Will there be any data written in state directory?
>>> When i verified the state directory , it was showing
>>> 0
>>> 0
>>>
>>> *2)* Application is running in kubernetes without any external volumes .
>>> Will state directory cause any processing issue during kubernetes pod
>>> restarts?
>>>
>>> *3)* Will the app creates a changelog topic since there is no in memory
>>> store used in the app?
>>>
>>>
>>> Code Snippet
>>> ===========
>>>
>>> Stream Config
>>> =============
>>>
>>> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> streamEnvironment.getKafkaBootstrapServers());
>>> properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
>>> NUMBER_OF_STREAM_THREADS);
>>> properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>> COMMIT_INTERVAL_MS);
>>>
>>>
>>> properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY);
>>> properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_
>> EXCEPTION_HANDLER_CLASS_CONFIG,
>>>         ItemDeserializationExceptionHandler.class);
>>>
>>>
>>>
>>> Stream builder
>>> =============
>>>
>>> private void processStream(StreamsBuilder builder) {
>>>     KStream<byte[], byte[]> input = builder.stream(inputTopic,
>>> Consumed.with(byteArraySerde, byteArraySerde))
>>>                                            .peek((key, value) ->
>>> metricsClient.writeMetric(
>>>
>>> CountMetric.generate(METRIC_OFFER_INPUT, 1)));
>>>
>>>     KStream<byte[], DeserializationResultWrapper>
>>> deserializationResultStream = input
>>>             .mapValues(this::deserializeOffer);
>>>
>>>     quarantineNonDeSerializableOffers(deserializationResultStream);
>>>
>>>     KStream<byte[], List<TransformerResult>> trans =
>>> transformOffers(deserializationResultStream);
>>>
>>>     produceToQuarantineTopic(trans);
>>>
>>>     produceToOutputTopic(trans);
>>>
>>> }
>>>
>>> private void produceToOutputTopic(KStream<byte[],
>>> List<TransformerResult>> trans) {
>>>     trans.filter((key, value) -> value != null
>>>                                  && !value.isEmpty())
>>>          .peek((key, value) ->
>>> metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT,
>>> 1)))
>>>          .flatMapValues(transformerResults ->
>> transformerResults.stream()
>>>
>>> .map(TransformerResult::getItem)
>>>
>>> .filter(Objects::nonNull)
>>>
>>> .collect(Collectors.toCollection(ArrayList::new)))
>>>          .to(outputTopic, Produced.with(byteArraySerde,
>> itemEnvelopeSerde));
>>> }
>>>
>>> private void produceToQuarantineTopic(KStream<byte[],
>>> List<TransformerResult>> trans) {
>>>     trans.filter((key, value) -> value == null || value.isEmpty()
>>>                                  ||
>>> value.stream().anyMatch(TransformerResult::hasErrors))
>>>          .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU))
>>>          .to(quarantineTopic, Produced.with(byteArraySerde,
>>> quarantineItemEnvelopeSerde));
>>> }
>>>
>>> Thanks
>>> Pradeep
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to