Also please note the upgrade guide https://kafka.apache.org/documentation/streams/upgrade-guide that when you upgrade from lower version to 2.4 you'd need to set the upgrade.from config, AND if you use optimization there's a small change you'd need to make as well.
On Mon, Dec 23, 2019 at 3:03 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Nitay, > > Could you share the topology description on both 2.4 and 2.3.1, and also > could you elaborate on the feature flag you turned on / off? > > > Guozhang > > On Mon, Dec 23, 2019 at 9:32 AM Nitay Kufert <nita...@ironsrc.com> wrote: > >> Hello, >> >> So as the title says, I am trying to upgrade our streams client to the new >> version (2.4) - and when trying to run our app with the new version on my >> local machine (and our test env) i keep getting the following error: >> >> > java.lang.IllegalArgumentException: Number of partitions must be at >> least >> > 1. >> > at >> > >> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62) >> > at >> > >> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) >> > at >> > >> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) >> > at >> > >> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) >> > at >> > >> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) >> > at >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) >> > at >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) >> > at >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) >> > at >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) >> > at >> > >> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) >> > at >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) >> > at >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) >> > at >> > >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) >> > >> >> The topic which this error report is a repartition topic of one of our >> state operation - meaning this is an internal topic which should be >> created >> by the app with the same number of partitions as the input topic. >> >> When running the app with 2.4 version, this topic doesn't even get >> created. >> when trying to run the app with the previous version (2.3.1) - which >> creates the topics, then switching back to 2.4 - I still get the same >> error. >> >> Following this error, it seems that this internal topic doesn't have >> "metadata" regarding the number of partitions ( >> >> repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()- >> returns false). >> >> This specific stream operation has a feature flag - so if I turn the >> feature flag OFF everything seems to work. >> >> Am I missing something? Maybe a new feature requires some new >> configurations? >> Any known bug? >> >> Help would be much appreciated >> >> Let me know if you need more information (I can try and describe our >> topology - pretty complex) >> >> Thanks! >> >> -- >> >> Nitay Kufert >> Backend Team Leader >> [image: ironSource] <http://www.ironsrc.com> >> >> email nita...@ironsrc.com >> mobile +972-54-5480021 >> fax +972-77-5448273 >> skype nitay.kufert.ssa >> 121 Menachem Begin St., Tel Aviv, Israel >> ironsrc.com <http://www.ironsrc.com> >> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: >> twitter] <https://twitter.com/ironsource> [image: facebook] >> <https://www.facebook.com/ironSource> [image: googleplus] >> <https://plus.google.com/+ironsrc> >> This email (including any attachments) is for the sole use of the intended >> recipient and may contain confidential information which may be protected >> by legal privilege. If you are not the intended recipient, or the employee >> or agent responsible for delivering it to the intended recipient, you are >> hereby notified that any use, dissemination, distribution or copying of >> this communication and/or its content is strictly prohibited. If you are >> not the intended recipient, please immediately notify us by reply email or >> by telephone, delete this email and destroy any copies. Thank you. >> > > > -- > -- Guozhang > -- -- Guozhang