You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499

Was fixed in 1.1 release.

Thus, you can just ignore the checkpoint file. There should be no issue
with running on Kubernetes.

Also, if there is no store (independent of disk based or in-memory)
there will be no changelog topic.


-Matthias

On 4/21/18 8:34 AM, pradeep s wrote:
> Hi,
> I am using kafka streams app connecting to confluent kafka cluster(10.2.1).
> Application is reading messages from a topic, performing a tranformation
> and pushing to output topic . There is no count or aggregation performed .
> Have following clarifications regarding state directory.
> 
> *1)* Will there be any data written in state directory?
> When i verified the state directory , it was showing
> 0
> 0
> 
> *2)* Application is running in kubernetes without any external volumes .
> Will state directory cause any processing issue during kubernetes pod
> restarts?
> 
> *3)* Will the app creates a changelog topic since there is no in memory
> store used in the app?
> 
> 
> Code Snippet
> ===========
> 
> Stream Config
> =============
> 
> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> streamEnvironment.getKafkaBootstrapServers());
> properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> NUMBER_OF_STREAM_THREADS);
> properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
> 
> 
> properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY);
> properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>         ItemDeserializationExceptionHandler.class);
> 
> 
> 
> Stream builder
> =============
> 
> private void processStream(StreamsBuilder builder) {
>     KStream<byte[], byte[]> input = builder.stream(inputTopic,
> Consumed.with(byteArraySerde, byteArraySerde))
>                                            .peek((key, value) ->
> metricsClient.writeMetric(
> 
> CountMetric.generate(METRIC_OFFER_INPUT, 1)));
> 
>     KStream<byte[], DeserializationResultWrapper>
> deserializationResultStream = input
>             .mapValues(this::deserializeOffer);
> 
>     quarantineNonDeSerializableOffers(deserializationResultStream);
> 
>     KStream<byte[], List<TransformerResult>> trans =
> transformOffers(deserializationResultStream);
> 
>     produceToQuarantineTopic(trans);
> 
>     produceToOutputTopic(trans);
> 
> }
> 
> private void produceToOutputTopic(KStream<byte[],
> List<TransformerResult>> trans) {
>     trans.filter((key, value) -> value != null
>                                  && !value.isEmpty())
>          .peek((key, value) ->
> metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT,
> 1)))
>          .flatMapValues(transformerResults -> transformerResults.stream()
> 
> .map(TransformerResult::getItem)
> 
> .filter(Objects::nonNull)
> 
> .collect(Collectors.toCollection(ArrayList::new)))
>          .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde));
> }
> 
> private void produceToQuarantineTopic(KStream<byte[],
> List<TransformerResult>> trans) {
>     trans.filter((key, value) -> value == null || value.isEmpty()
>                                  ||
> value.stream().anyMatch(TransformerResult::hasErrors))
>          .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU))
>          .to(quarantineTopic, Produced.with(byteArraySerde,
> quarantineItemEnvelopeSerde));
> }
> 
> Thanks
> Pradeep
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to