[ 
https://issues.apache.org/jira/browse/KAFKA-13887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579032#comment-17579032
 ] 

Sylwester Kras commented on KAFKA-13887:
----------------------------------------

Hi all,

thanks for this post. As [~cadonna] has mentioned already I either don't 
understand why somebody should prefer to run multiple instances of a streams 
application (one the same machine!) instead of increasing the number threads. 

However the post is interesting and reminds me about an issue that might be 
solvable with this.

Imagine you run multiple instances of the same stream application on kubernetes 
nodes. If I got it correct the thing you need to have to implement stateful 
processing (that gets backed to disk) is individual storage per instance. In 
case where your pods don't have permanent storage (that is restart-aware) 
actually you don't have the chance to configure your state.dir. Meaning: no 
local store, no stateful processing that is backed on disk. Is this true?

One solution can be do attach one common disk storage (from you cloud provider) 
to all your instances and use the mechanic mentioned above to provide a 
state.dir to every application instance.

Is this something that could work? The problem is state.dir gets associated 
more or less randomly to the application instance. But what has to be ensured 
is the partition assignment for the given application instance fits to the 
state. This mean we cannot randomly link an application instance with its 
current partition assignment to a random state store...

if I got it correct, the sate gets transferred from instance to instance in 
case of partition reassignment. But in this setup I can't imagine this can 
work.... Any input on that?

How to implement stateful processing with state on disk when the pods do not 
offer storage?
(Actually there is storage, but it is too slow for RocksDB) 

> Running multiple instance of same stateful KafkaStreams application on single 
> host raise Exception
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13887
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13887
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Sina Askarnejad
>            Priority: Minor
>
> KAFKA-10716 locks the state store directory on the running host, as it stores 
> the processId in a *kafka-streams-process-metadata* file in this path. As a 
> result to run multiple instances of the same application on a single host 
> each instance must run with different *state.dir* config, otherwise the 
> following exception will be raised for the second instance:
>  
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> Unable to initialize state, this can happen if multiple instances of Kafka 
> Streams are running in the same state directory
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:868)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:851)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:821)
> at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:733)
>  
> The easiest solution multi-threading. Running single instance with multiple 
> threads, but the multi-threading programming is not suitable for all 
> scenarios. e.g., when the tasks are CPU intensive, or in large scale 
> scenarios, or fully utilizing multi core CPUS.
>  
> The second solution is multi-processing. This solution on a single host needs 
> extra work and advisor, as each instance needs to be run with different 
> {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this 
> config for multi instance.
>  
> The proposed solution is that the KafkaStreams use the 
> */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of 
> */\{state.dir}/\{application.id}* to store the meta file and states. The 
> *ordinal.number* starts with 0 and is incremental.
> When an instance starts it checks the ordinal.number directories start by 0 
> and finds the first subdirectory that is not locked and use that for its 
> state directory, this way all the tasks assigns correctly on rebalance and 
> multiple instance can be run on single host.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to