[jira] [Created] (KAFKA-13887) Running multiple instance of same stateful KafkaStreams application on single host raise Exception
Sina Askarnejad created KAFKA-13887: --- Summary: Running multiple instance of same stateful KafkaStreams application on single host raise Exception Key: KAFKA-13887 URL: https://issues.apache.org/jira/browse/KAFKA-13887 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.6.0 Reporter: Sina Askarnejad KAFKA-10716 locks the state store directory on the running host, as it stores the processId in a *kafka-streams-process-metadata* file in this path. As a result to run multiple instances of the same application on a single host each instance must run with different *state.dir* config, otherwise the following exception will be raised for the second instance: Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory at org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:868) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:851) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:821) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:733) The easiest solution multi-threading. Running single instance with multiple threads, but the multi-threading programming is not suitable for all scenarios. e.g., when the tasks are CPU intensive, or in large scale scenarios, or fully utilizing multi core CPUS. The second solution is multi-processing. This solution on a single host needs extra work and advisor, as each instance needs to be run with different {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this config for multi instance. The proposed solution is that the KafkaStreams use the */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of */\{state.dir}/\{application.id}* to store the meta file and states. The *ordinal.number* starts with 0 and is incremental. When an instance starts it checks the ordinal.number directories start by 0 and finds the first subdirectory that is not locked and use that for its state directory, this way all the tasks assigns correctly on rebalance and multiple instance can be run on single host. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] KIP-834: Pause / Resume KafkaStreams Topologies
Thanks for the KIP, Jim! This conversation seems to highlight that the KIP needs to specify some of its behavior as well as its APIs, where the behavior is observable and significant to users. For example: 1. Do you plan to have a guarantee that immediately after calling KafkaStreams.pause(), users should observe that the instance stops processing new records? Or should they expect that the threads will continue to process some records and pause asynchronously (you already answered this in the thread earlier)? 2. Will the threads continue to poll new records until they naturally fill up the task buffers, or will they immediately pause their Consumers as well? 3. Will threads continue to call (system time) punctuators, or would punctuations also be paused? I realize that some of those questions simply may not have occurred to you, so this is not a criticism for leaving them off; I'm just pointing out that although we don't tend to mention implementation details in KIPs, we also can't be too high level, since there are a lot of operational details that users rely on to achieve various behaviors in Streams. A couple more comments: 4. +1 to what Guozhang said. It seems like we should we also do a commit before entering the paused state. That way, any open transactions would be closed and not have to worry about timing out. Even under ALOS, it seems best to go ahead and complete the processing of in-flight records by committing. That way, if anything happens to die while it's paused, existing work won't have to be repeated. Plus, if there are any processors with side effects, users won't have to tolerate weird edge cases where a pause occurs after a processor sees a record, but before the result is sent to its outputs. 5. I noticed that you proposed not to add a PAUSED state, but I didn't follow the rationale. Adding a state seems beneficial for a number of reasons: StreamThreads already use the thread state to determine whether to process or not, so avoiding a new State would just mean adding a separate flag to track and then checking your new flag in addition to the State in the thread. Also, operating Streams applications is a non-trivial task, and users rely on the State (and transitions) to understand Streams's behavior. Adding a PAUSED state is an elegant way to communicate to operators what is happening with the application. Note that the person digging though logs and metrics, trying to understand why the application isn't doing anything is probably not going to be the same person who is calling pause() and resume(). Also, if you add a state, you don't need `isPaused()`. 5b. If you buy the arguments to go ahead and commit as well as the argument to add a State, then I'd also suggest to follow the existing patterns for the shutdown states by also adding PAUSING. That way, you'll also expose a way to understand that Streams received the signal to pause, and that it's still processing and committing some records in preparation to enter a PAUSED state. I'm not sure if a RESUMING state would also make sense. And that's all I have to say about that. I hope you don't find my long message offputting. I'm fundamentally in favor of your KIP, and I think with a little more explanation in the KIP, and a few small tweaks to the proposal, we'll be able to provide good ergonomics to our users. Thanks, -John On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote: > I'm in favor of the "just pausing the instance itself“ option as well. As > for EOS, the point is that when the processing is paused, we would not > trigger any `producer.send` during the time, and the transaction timeout is > sort of relying on that behavior, so my point was that it's probably better > to also commit the processing before we pause it. > > > Guozhang > > On Fri, May 6, 2022 at 6:12 PM Jim Hughes > wrote: > >> Hi Matthias, >> >> Since the only thing which will be paused is processing the topology, I >> think we can let commits happen naturally. >> >> Good point about getting the paused state to new members; it is seeming >> like the "building block" approach is a good one to keep things simple at >> first. >> >> Cheers, >> >> Jim >> >> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax wrote: >> >> > I think it's tricky to propagate a pauseAll() via the rebalance >> > protocol. New members joining the group would need to get paused, too? >> > Could there be weird race conditions with overlapping pauseAll() and >> > resumeAll() calls on different instanced while there could be a errors / >> > network partitions or similar? >> > >> > I would argue that similar to IQ, we provide the basic building blocks, >> > and leave it the user users to implement cross instance management for a >> > pauseAll() scenario. -- Also, if there is really demand, we can always >> > add pauseAll()/resumeAll() as follow up work. >> > >> > About named typologies: I agree to Jim to not include them in this KIP >> > as they are not a public feature yet. If we mak
Re: [DISCUSS] KIP-832 Allow creating a producer/consumer using a producer/consumer config
Thanks, François! Those changes look good to me. Thanks, -John On Fri, May 6, 2022, at 13:51, François Rosière wrote: > The KIP has been updated to reflect the last discussion > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882578#KIP832:Allowcreatingaproducer/consumerusingaproducer/consumerconfig-ProposedChanges > > > Le ven. 6 mai 2022 à 20:44, François Rosière a > écrit : > >> Hello, >> >> No problem to also add a constructor taking the StreamsConfig in the >> TopologyTestDriver. >> >> Summary about the changes to apply: >> >>- Create 2 new constructors in KafkaProducer >>- Create a new constructor in KafkaConsumer and increase de visibility >>of an existing one >>- Create a new constructor in TopologyTestDriver >> >> Kr, >> >> F. >> >> Le ven. 6 mai 2022 à 16:57, John Roesler a écrit : >> >>> Thanks for the KIP, François! >>> >>> I'm generally in favor of your KIP, since you're >>> proposing to follow the existing pattern of the >>> constructors for both Producer and Consumer, >>> but with the config object instead of Properties >>> or Map configs. Also, because we already have >>> this pattern in Streams, and we are just >>> extending it to Producer and Consumer. >>> >>> Following on the KIP-378 discussion, I do still think >>> this is somewhat of an abuse of the Config objects, >>> and it would be better to have a formal dependency >>> injection interface, but I also don't want to let perfect >>> be the enemy of good. Since it looks like this approach >>> works, and there is also some precedent for it already, >>> I'd be inclined to approve it. >>> >>> Since KIP-378 didn't make it over the finish line, and it >>> seems like a small expansion to your proposal, do you >>> mind also adding the StreamsConfig to the >>> TopologyTestDriver constructors? That way, we can go >>> ahead and resolve both KIPs at once. >>> >>> Thank you, >>> -John >>> >>> >>> On Fri, May 6, 2022, at 06:06, François Rosière wrote: >>> > To stay consistent with existing code, we should simply add 2 >>> constructors. >>> > One with ser/deser and one without. >>> > So that, users have the choice to use one or the other. >>> > I updated the KIP accordingly. >>> > >>> > Le ven. 6 mai 2022 à 12:55, François Rosière < >>> francois.rosi...@gmail.com> a >>> > écrit : >>> > >>> >> On the other hand, the KafkaConsumer constructor with a config + >>> >> serializer and deserializer already exists but is not public. >>> >> It would also complexify a bit the caller to not have the >>> >> serializer/deserializer exposed at constructor level. >>> >> >>> >> Once the KIP would have been implemented, for streams, instead of >>> having a >>> >> custom config (already possible), I may simply define a custom >>> >> KafkaClientSupplier reusing the custom configs of both the producer >>> and the >>> >> consumer. >>> >> This supplier currently creates producers and consumers using the >>> >> constructors with a map of config + serializer/deserializer. >>> >> >>> >> So, it seems it's easier to have the constructor with 3 parameters. >>> But in >>> >> any case, it will work if the config can be accessed... >>> >> >>> >> Le ven. 6 mai 2022 à 12:14, François Rosière < >>> francois.rosi...@gmail.com> >>> >> a écrit : >>> >> >>> >>> Hello, >>> >>> >>> >>> We may create a constructor with a single parameter which is the >>> config >>> >>> but then, I would need to give the serializer/deserializer by also >>> >>> overriding the config. >>> >>> Like I would do for the interceptors. >>> >>> So, no real opinion on that, both solutions are ok for me. >>> >>> Maybe easier to take the approach of the single parameter. >>> >>> >>> >>> Hope it respond to the question. >>> >>> >>> >>> Kr, >>> >>> >>> >>> F. >>> >>> >>> >>> Le ven. 6 mai 2022 à 11:59, Bruno Cadonna a >>> écrit : >>> >>> >>> Hi Francois, >>> >>> Thank you for updating the KIP! >>> >>> Now the motivation of the KIP is much clearer. >>> >>> I would still be interested in: >>> >>> >> 2. Why do you only want to change/add the constructors that take >>> the >>> >> properties objects and de/serializers and you do not also want to >>> >> add/change the constructors that take only the properties? >>> >>> >>> Best, >>> Bruno >>> >>> On 05.05.22 23:15, François Rosière wrote: >>> > Hello Bruno, >>> > >>> > The KIP as been updated. Feel free to give more feedbacks and I >>> will >>> > complete accordingly. >>> > >>> > Kr, >>> > >>> > F. >>> > >>> > Le jeu. 5 mai 2022 à 22:22, Bruno Cadonna a >>> écrit : >>> > >>> >> Hi Francois, >>> >> >>> >> Thanks for the KIP! >>> >> >>> >> Here my first feedback: >>> >> >>> >> 1. Could you please extend the motivation section, so that it is >>> clear >>> >> for a non-Spring dev why the change is needed? Usually, a >>> motivation >>> >> section bene