Hi Matthias, Thanks for the reply! The binder expects an array and their doc about multiple output bindings uses the new branching methods without noting the possible "out of order" bindings (they use the branch map values to create an array like in my code snippet).
For now, I've manually provided an array built using named branches to get my desired bindings. I didn't see any plans from the binder on changing how multiple output bindings will bind but I'll slink over to ask about that or at least updating the example in the docs. Get Outlook for iOS<https://aka.ms/o0ukef> ________________________________ From: Matthias J. Sax <mj...@apache.org> Sent: Thursday, June 20, 2024 9:25:10 PM To: users@kafka.apache.org <users@kafka.apache.org> Subject: Re: Kafka Streams branching return type affects bindings when using multiple output bindings: intentional behaviour? Jenny, thanks for reaching out. Yes, we plan to remove the old variant of `branch()` (which was deprecated with KIP-418) in the 4.0 release (https://issues.apache.org/jira/browse/KAFKA-12824) which is planned for end of this year. KIP-418 changes the pattern from relying on order to relying on *names*. The KIP explains the naming strategy in detail: > The keys of the Map entries are defined by the following rules: > > - If Named parameter was provided for split , its value is used as a prefix > for each key. By default, no prefix is used > - If a name is provided for the branch, its value is appended to the prefix > to form the Map key > - If a name is not provided for the branch, then the key defaults to prefix + > position of the branch as a decimal number, starting from "1" > - If a name is not provided for the defaultBranch call, then the key defaults > to prefix + "0" > > The values of the Map entries are formed as following: > > - If no chain function or consumer is provided, then the value is the branch > itself (which is equivalent to ks→ks identity chain function) > - If a chain function is provided and returns a non-null value for a given > branch, then the value is the result returned by this function > - If a chain function returns null for a given branch, then the respective > entry is not put to the map > - If a consumer is provided for a given branch, then the the respective entry > is not put to the map So me it seems to be rather an issue with Spring than with Kafka Streams, if the corresponding binder does not use the new API in the intended way? In the end, why is the binder still returning an array anyway instead if a `Map`? And if it does need to return an array, why does it "naively" create the array via `streamMap.values().toArray(new KStream[0])` introducing the problem? It seem, Spring could implement the binder using more sophisticated logic to fix the problem (or just return the `Map` instead of an array to begin with)? HTH. -Matthias On 6/20/24 11:02 AM, Jenny Qiao wrote: > Hello, > > > KIP-418 (>=2.8) improved the branching return type from KStream<>[] to > HashMap. This changes the output binding when using multiple branches in the > Kafka Streams binder of Spring Cloud Stream - the bindings are possibly wrong > because the map storing the KStream branches is unordered. > > > > Before KIP-418, if I have multiple branches, my function could look like this: > > > > public Function<KStream<String, String>, KStream<String, String>[]> process() > { > > Predicate<String, String> isHappy = (k, v) -> v.contains(":D"); > > Predicate<String, String> isSad = (k, v) -> v.contains(":("); > > Predicate<String, String> isOwo = (k, v) -> v.toLowerCase().contains("owo"); > > return input.branch(isHappy, isSad, isOwo); > > } > > > > There is an output binding for each branch. Since the return type is a > KStream[] where the order of branches matches the supplied order, the > corresponding output bindings are process-out-0 (isHappy), process-out-1 > (isSad), process-out-2 (isOwo). > > > > This changes when using the new branching impl: > > > > public Function<KStream<String, String>, KStream<String, String>[]> process() > { > > return input -> { > > final Map<String,KStream<String, String>> streamMap = input > > .split() > > .branch(isHappy) > > .branch(isSad) > > .branch(isOwo) > > .noDefaultBranch(); > > return streamMap.values().toArray(new KStream[0]); > > } > > } > > > > The call to HashMap<>.values() doesn't necessarily return the KStreams in > supplied/insertion order. For example, if the order of KStreams in the return > corresponds to isSad, isOwo, isHappy, the output bindings would become > process-out-0 (isSad), process-out-1 (isOwo), process-out-2 (isHappy) - > different from expected. If the map was ordered like a LinkedHashMap, the > return order and thus the bindings would be correct. > > > > Is this intended behaviour with KIP-418? If so, will the deprecated branching > methods be kept indefinitely for multiple output bindings to bind correctly? > >