Thanks Nitay. This is very helpful for our investigation, will look into the ticket.
Guozhang On Thu, Dec 26, 2019 at 12:19 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks Nitay! We will investigate! > > > -Matthias > > On 12/26/19 2:53 AM, Nitay Kufert wrote: > > Also posted it at apache jira: > > https://issues.apache.org/jira/browse/KAFKA-9335 > > > > On Thu, Dec 26, 2019 at 12:41 PM Nitay Kufert <nita...@ironsrc.com> > wrote: > > > >> I have made a "toy" example to reproduce this error, this is more or > less > >> what's going on in our application: > >>> > >>> package com.supersonic.bos.consumer > >>> > >>> import java.time.Duration > >>> import java.time.temporal.ChronoUnit > >>> import java.util.Properties > >>> import org.apache.kafka.common.serialization.Serde > >>> import org.apache.kafka.streams.kstream.TimeWindows > >>> import org.apache.kafka.streams.scala.kstream._ > >>> import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, > ByteArrayWindowStore, Serdes, StreamsBuilder} > >>> import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} > >>> import scala.util.{Random, Try} > >>> > >>> object MissingPartitionExample extends App { > >>> > >>> implicit val stringSerde: Serde[String] = Serdes.String > >>> implicit val integerSerde: Serde[Integer] = Serdes.JavaInteger > >>> implicit val intSerde: Serde[Int] = Serdes.Integer > >>> > >>> val config: Properties = { > >>> val p = new Properties() > >>> p.put(StreamsConfig.APPLICATION_ID_CONFIG, > "wordcount-scala-application") > >>> val bootstrapServers = if (args.length > 0) args(0) else > "localhost:9092" > >>> p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) > >>> p > >>> } > >>> > >>> val builder = new StreamsBuilder() > >>> val input: KStream[String, String] = builder.stream[String, > String]("input_numbers_topic")(Consumed.`with`(Serdes.String, > Serdes.String)) > >>> val branched = input > >>> .peek((k, v) => println(s"input: $k -> $v")) > >>> .flatMapValues(str => Try(str.toInt).toOption) > >>> .branch( > >>> (_, v) => v % 2 == 0, > >>> (_, v) => v % 2 != 0 > >>> ) > >>> > >>> val r = Random.nextInt(2) > >>> > >>> val i1 = branched(0) > >>> .map((k, v) => s"$r" -> v) > >>> .peek((k, v) => println(s"even: $k -> $v")) > >>> .groupByKey(Grouped.`with`) > >>> .windowedBy(TimeWindows.of(Duration.of(1, > ChronoUnit.MINUTES)).grace(Duration.of(1, ChronoUnit.MINUTES))) > >>> .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, > ByteArrayWindowStore]("even_store")) > >>> .toStream((windowedKey, _) => windowedKey.key()) > >>> .peek((k, v) => println(s"even toStream: $k -> $v")) > >>> .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer)) > >>> .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, > ByteArrayKeyValueStore]("even_store_2")(Serdes.String, Serdes.Integer)) > >>> > >>> val i2 = branched(1) > >>> .map((k, v) => s"$r" -> v) > >>> .peek((k, v) => println(s"odd: $k -> $v")) > >>> .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer)) > >>> .windowedBy(TimeWindows.of(Duration.of(1, > ChronoUnit.MINUTES)).grace(Duration.of(1, ChronoUnit.MINUTES))) > >>> .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, > ByteArrayWindowStore]("odd_store")(Serdes.String, Serdes.Integer)) > >>> .toStream((windowedKey, _) => windowedKey.key()) > >>> .peek((k, v) => println(s"even toStream: $k -> $v")) > >>> .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer)) > >>> .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, > ByteArrayKeyValueStore]("odd_store_2")(Serdes.String, Serdes.Integer)) > >>> > >>> i1.outerJoin(i2)((v1, v2) => v1 + v2) > >>> .toStream > >>> .peek((k, v) => println(s"join: $k -> $v")) > >>> > >>> val streams: KafkaStreams = new KafkaStreams(builder.build(), config) > >>> streams.start() > >>> > >>> // Add shutdown hook to respond to SIGTERM and gracefully close > Kafka Streams > >>> sys.ShutdownHookThread { > >>> val _ = streams.close(Duration.ofSeconds(10)) > >>> } > >>> } > >>> > >>> When trying to run it with 2.4.0 we get: > >> > >>> 12:39:57,443 |-INFO in ch.qos.logback.classic.LoggerContext[default] - > >>> Could NOT find resource [logback.groovy] > >> > >> 12:39:57,443 |-INFO in ch.qos.logback.classic.LoggerContext[default] - > >> Found resource [logback-test.xml] at > >> > [file:/Users/nitaykufert/Supersonic/sonic/app/services/bos/service/target/scala-2.11/classes/logback-test.xml] > >> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] - > >> Resource [logback-test.xml] occurs multiple times on the classpath. > >> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] - > >> Resource [logback-test.xml] occurs at > >> > [file:/Users/nitaykufert/Supersonic/sonic/app/services/bos/service/target/scala-2.11/classes/logback-test.xml] > >> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] - > >> Resource [logback-test.xml] occurs at > >> > [file:/Users/nitaykufert/Supersonic/sonic/app/utils/consumers-common/target/scala-2.11/classes/logback-test.xml] > >> 12:39:57,518 |-INFO in > >> ch.qos.logback.classic.joran.action.ConfigurationAction - debug > attribute > >> not set > >> 12:39:57,525 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - > >> About to instantiate appender of type > [ch.qos.logback.core.ConsoleAppender] > >> 12:39:57,533 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - > >> Naming appender as [STDOUT] > >> 12:39:57,539 |-INFO in > >> ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming > default > >> type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] > >> property > >> 12:39:57,589 |-INFO in > >> ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of > >> ROOT logger to ERROR > >> 12:39:57,589 |-INFO in > ch.qos.logback.core.joran.action.AppenderRefAction > >> - Attaching appender named [STDOUT] to Logger[ROOT] > >> 12:39:57,591 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction > - > >> Setting level of logger [OnlineBudgetConsumer] to DEBUG > >> 12:39:57,591 |-INFO in > >> ch.qos.logback.classic.joran.action.ConfigurationAction - End of > >> configuration. > >> 12:39:57,591 |-INFO in > >> ch.qos.logback.classic.joran.JoranConfigurator@4fb64261 - Registering > >> current configuration as safe fallback point > >> 12:39:58.523 ERROR ?#?:? > >> - stream-thread > >> > [wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1-StreamThread-1] > >> Encountered the following error during processing: > >> java.lang.IllegalArgumentException: Number of partitions must be at > least > >> 1. > >> at > >> > org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > >> at > >> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > >> at > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > >> at > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > >> 12:39:58.541 ERROR ?#?:? > >> - stream-client > >> [wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1] All > >> stream threads have died. The instance will be in error state and > should be > >> closed. > >> Exception in thread > >> > "wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1-StreamThread-1" > >> java.lang.IllegalArgumentException: Number of partitions must be at > least 1. > >> at > >> > org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > >> at > >> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > >> at > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > >> at > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > >> at > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > >> at > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > >> at > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > >> Process finished with exit code 0 > >> > >> > >> On Wed, Dec 25, 2019 at 12:56 PM Nitay Kufert <nita...@ironsrc.com> > wrote: > >> > >>> Debugged a little, and I think the problem occurs when using > *outerJoin *and > >>> it always the repartition topic of the 2nd ktable of the join. > >>> > >>> I also didn't mentioned we are working with Scala 2.11.12 > >>> > >>> On Tue, Dec 24, 2019 at 6:15 PM Nitay Kufert <nita...@ironsrc.com> > wrote: > >>> > >>>> Hey, thanks for the response! > >>>> > >>>> Clarifying the feature flag: > >>>> We have a flag that controls how our streams application is built > >>>> (meaning, the topology is changing according to this flag). > >>>> To be fair, the flag is set to TRUE for the last couple of months - so > >>>> the fact that the streams application works without it is just to > provide > >>>> some indication where the problem may be. > >>>> > >>>> I don't think it has anything to do with the migration guide you > >>>> supplied since I am testing it on my own local ENV and I can have a > fresh > >>>> start each test (no need to migrate nothing - just as if its the > first time > >>>> I even run the app). > >>>> > >>>> Regarding the topology - I can't get to a point where I can print the > >>>> topology with 2.4 as I can't get the application to start. > >>>> The topology of the working streams app (kafka 2.3.1) can be found > here: > >>>> https://gist.github.com/nitayk/8c4f70f42e827686e7ad87e4e9f34074 > >>>> Kind of complex, but the error is thrown over the following topic: > >>>> *cdr_i_store_is_final-repartition* > >>>> and generally speaking, when I am closing the feature flag, > *sub-topologies > >>>> 5,6,7 are gone *and everything seems to work fine (here is the > topology > >>>> when the toggle is OFF): > >>>> https://gist.github.com/nitayk/2460cd6ac5d8122a31cee90991bc03a8 > >>>> > >>>> I didn't dig deep enough in the source code as I can't find the time > to > >>>> do it > >>>> But its pretty clear there is some unexpected behavior as I can make > it > >>>> work by just "removing" some code > >>>> > >>>> Any ideas? > >>>> > >>>> On Tue, Dec 24, 2019 at 1:06 AM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>> > >>>>> Also please note the upgrade guide > >>>>> https://kafka.apache.org/documentation/streams/upgrade-guide that > when > >>>>> you > >>>>> upgrade from lower version to 2.4 you'd need to set the upgrade.from > >>>>> config, AND if you use optimization there's a small change you'd > need to > >>>>> make as well. > >>>>> > >>>>> On Mon, Dec 23, 2019 at 3:03 PM Guozhang Wang <wangg...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hello Nitay, > >>>>>> > >>>>>> Could you share the topology description on both 2.4 and 2.3.1, and > >>>>> also > >>>>>> could you elaborate on the feature flag you turned on / off? > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Mon, Dec 23, 2019 at 9:32 AM Nitay Kufert <nita...@ironsrc.com> > >>>>> wrote: > >>>>>> > >>>>>>> Hello, > >>>>>>> > >>>>>>> So as the title says, I am trying to upgrade our streams client to > >>>>> the new > >>>>>>> version (2.4) - and when trying to run our app with the new version > >>>>> on my > >>>>>>> local machine (and our test env) i keep getting the following > error: > >>>>>>> > >>>>>>>> java.lang.IllegalArgumentException: Number of partitions must be > at > >>>>>>> least > >>>>>>>> 1. > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>>> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > >>>>>>>> > >>>>>>> > >>>>>>> The topic which this error report is a repartition topic of one of > >>>>> our > >>>>>>> state operation - meaning this is an internal topic which should be > >>>>>>> created > >>>>>>> by the app with the same number of partitions as the input topic. > >>>>>>> > >>>>>>> When running the app with 2.4 version, this topic doesn't even get > >>>>>>> created. > >>>>>>> when trying to run the app with the previous version (2.3.1) - > which > >>>>>>> creates the topics, then switching back to 2.4 - I still get the > same > >>>>>>> error. > >>>>>>> > >>>>>>> Following this error, it seems that this internal topic doesn't > have > >>>>>>> "metadata" regarding the number of partitions ( > >>>>>>> > >>>>>>> > >>>>> > repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()- > >>>>>>> returns false). > >>>>>>> > >>>>>>> This specific stream operation has a feature flag - so if I turn > the > >>>>>>> feature flag OFF everything seems to work. > >>>>>>> > >>>>>>> Am I missing something? Maybe a new feature requires some new > >>>>>>> configurations? > >>>>>>> Any known bug? > >>>>>>> > >>>>>>> Help would be much appreciated > >>>>>>> > >>>>>>> Let me know if you need more information (I can try and describe > our > >>>>>>> topology - pretty complex) > >>>>>>> > >>>>>>> Thanks! > >>>>>>> > >>>>>>> -- > >>>>>>> > >>>>>>> Nitay Kufert > >>>>>>> Backend Team Leader > >>>>>>> [image: ironSource] <http://www.ironsrc.com> > >>>>>>> > >>>>>>> email nita...@ironsrc.com > >>>>>>> mobile +972-54-5480021 > >>>>>>> fax +972-77-5448273 > >>>>>>> skype nitay.kufert.ssa > >>>>>>> 121 Menachem Begin St., Tel Aviv, Israel > >>>>>>> ironsrc.com <http://www.ironsrc.com> > >>>>>>> [image: linkedin] <https://www.linkedin.com/company/ironsource> > >>>>> [image: > >>>>>>> twitter] <https://twitter.com/ironsource> [image: facebook] > >>>>>>> <https://www.facebook.com/ironSource> [image: googleplus] > >>>>>>> <https://plus.google.com/+ironsrc> > >>>>>>> This email (including any attachments) is for the sole use of the > >>>>> intended > >>>>>>> recipient and may contain confidential information which may be > >>>>> protected > >>>>>>> by legal privilege. If you are not the intended recipient, or the > >>>>> employee > >>>>>>> or agent responsible for delivering it to the intended recipient, > >>>>> you are > >>>>>>> hereby notified that any use, dissemination, distribution or > copying > >>>>> of > >>>>>>> this communication and/or its content is strictly prohibited. If > you > >>>>> are > >>>>>>> not the intended recipient, please immediately notify us by reply > >>>>> email or > >>>>>>> by telephone, delete this email and destroy any copies. Thank you. > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>>> > >>>> -- > >>>> > >>>> Nitay Kufert > >>>> Backend Team Leader > >>>> [image: ironSource] <http://www.ironsrc.com> > >>>> > >>>> email nita...@ironsrc.com > >>>> mobile +972-54-5480021 > >>>> fax +972-77-5448273 > >>>> skype nitay.kufert.ssa > >>>> 121 Menachem Begin St., Tel Aviv, Israel > >>>> ironsrc.com <http://www.ironsrc.com> > >>>> [image: linkedin] <https://www.linkedin.com/company/ironsource> > [image: > >>>> twitter] <https://twitter.com/ironsource> [image: facebook] > >>>> <https://www.facebook.com/ironSource> [image: googleplus] > >>>> <https://plus.google.com/+ironsrc> > >>>> This email (including any attachments) is for the sole use of the > >>>> intended recipient and may contain confidential information which may > be > >>>> protected by legal privilege. If you are not the intended recipient, > or the > >>>> employee or agent responsible for delivering it to the intended > recipient, > >>>> you are hereby notified that any use, dissemination, distribution or > >>>> copying of this communication and/or its content is strictly > prohibited. If > >>>> you are not the intended recipient, please immediately notify us by > reply > >>>> email or by telephone, delete this email and destroy any copies. > Thank you. > >>>> > >>> > >>> > >>> -- > >>> > >>> Nitay Kufert > >>> Backend Team Leader > >>> [image: ironSource] <http://www.ironsrc.com> > >>> > >>> email nita...@ironsrc.com > >>> mobile +972-54-5480021 > >>> fax +972-77-5448273 > >>> skype nitay.kufert.ssa > >>> 121 Menachem Begin St., Tel Aviv, Israel > >>> ironsrc.com <http://www.ironsrc.com> > >>> [image: linkedin] <https://www.linkedin.com/company/ironsource> > [image: > >>> twitter] <https://twitter.com/ironsource> [image: facebook] > >>> <https://www.facebook.com/ironSource> [image: googleplus] > >>> <https://plus.google.com/+ironsrc> > >>> This email (including any attachments) is for the sole use of the > >>> intended recipient and may contain confidential information which may > be > >>> protected by legal privilege. If you are not the intended recipient, > or the > >>> employee or agent responsible for delivering it to the intended > recipient, > >>> you are hereby notified that any use, dissemination, distribution or > >>> copying of this communication and/or its content is strictly > prohibited. If > >>> you are not the intended recipient, please immediately notify us by > reply > >>> email or by telephone, delete this email and destroy any copies. Thank > you. > >>> > >> > >> > >> -- > >> > >> Nitay Kufert > >> Backend Team Leader > >> [image: ironSource] <http://www.ironsrc.com> > >> > >> email nita...@ironsrc.com > >> mobile +972-54-5480021 > >> fax +972-77-5448273 > >> skype nitay.kufert.ssa > >> 121 Menachem Begin St., Tel Aviv, Israel > >> ironsrc.com <http://www.ironsrc.com> > >> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: > >> twitter] <https://twitter.com/ironsource> [image: facebook] > >> <https://www.facebook.com/ironSource> [image: googleplus] > >> <https://plus.google.com/+ironsrc> > >> This email (including any attachments) is for the sole use of the > intended > >> recipient and may contain confidential information which may be > protected > >> by legal privilege. If you are not the intended recipient, or the > employee > >> or agent responsible for delivering it to the intended recipient, you > are > >> hereby notified that any use, dissemination, distribution or copying of > >> this communication and/or its content is strictly prohibited. If you are > >> not the intended recipient, please immediately notify us by reply email > or > >> by telephone, delete this email and destroy any copies. Thank you. > >> > > > > > > -- -- Guozhang