[ 
https://issues.apache.org/jira/browse/KAFKA-13887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579602#comment-17579602
 ] 

Bruno Cadonna commented on KAFKA-13887:
---------------------------------------

bq. Meaning: no local store, no stateful processing that is backed on disk. Is 
this true?

No, you can have an attached storage volume. In k8s you need to use stateful 
sets to assign the the same storage volume to the same pod across restarts. 

bq. The problem is state.dir gets associated more or less randomly to the 
application instance. But what has to be ensured is the partition assignment 
for the given application instance fits to the state. This mean we cannot 
randomly link an application instance with its current partition assignment to 
a random state store...

The state store directory needs to be there before the assignment. Streams is 
not designed to allow to change the state store directory on the fly.

bq. if I got it correct, the sate gets transferred from instance to instance in 
case of partition reassignment. But in this setup I can't imagine this can 
work.... Any input on that?

The state is transferred through the changelog topics on the Kafka brokers. The 
setup of the nodes do not matter, except that there needs to be some sort of 
persistent volume to write to per Streams instance if you use persistent state 
store. If you use in-memory state stores you do not need the a persistent 
volume.  

> Running multiple instance of same stateful KafkaStreams application on single 
> host raise Exception
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13887
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13887
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Sina Askarnejad
>            Priority: Minor
>
> KAFKA-10716 locks the state store directory on the running host, as it stores 
> the processId in a *kafka-streams-process-metadata* file in this path. As a 
> result to run multiple instances of the same application on a single host 
> each instance must run with different *state.dir* config, otherwise the 
> following exception will be raised for the second instance:
>  
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> Unable to initialize state, this can happen if multiple instances of Kafka 
> Streams are running in the same state directory
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:868)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:851)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:821)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:733)
>  
> The easiest solution multi-threading. Running single instance with multiple 
> threads, but the multi-threading programming is not suitable for all 
> scenarios. e.g., when the tasks are CPU intensive, or in large scale 
> scenarios, or fully utilizing multi core CPUS.
>  
> The second solution is multi-processing. This solution on a single host needs 
> extra work and advisor, as each instance needs to be run with different 
> {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this 
> config for multi instance.
>  
> The proposed solution is that the KafkaStreams use the 
> */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of 
> */\{state.dir}/\{application.id}* to store the meta file and states. The 
> *ordinal.number* starts with 0 and is incremental.
> When an instance starts it checks the ordinal.number directories start by 0 
> and finds the first subdirectory that is not locked and use that for its 
> state directory, this way all the tasks assigns correctly on rebalance and 
> multiple instance can be run on single host.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to