Thanks Nitay. This is very helpful for our investigation, will look into
the ticket.


Guozhang

On Thu, Dec 26, 2019 at 12:19 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks Nitay! We will investigate!
>
>
> -Matthias
>
> On 12/26/19 2:53 AM, Nitay Kufert wrote:
> > Also posted it at apache jira:
> > https://issues.apache.org/jira/browse/KAFKA-9335
> >
> > On Thu, Dec 26, 2019 at 12:41 PM Nitay Kufert <nita...@ironsrc.com>
> wrote:
> >
> >> I have made a "toy" example to reproduce this error, this is more or
> less
> >> what's going on in our application:
> >>>
> >>> package com.supersonic.bos.consumer
> >>>
> >>> import java.time.Duration
> >>> import java.time.temporal.ChronoUnit
> >>> import java.util.Properties
> >>> import org.apache.kafka.common.serialization.Serde
> >>> import org.apache.kafka.streams.kstream.TimeWindows
> >>> import org.apache.kafka.streams.scala.kstream._
> >>> import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore,
> ByteArrayWindowStore, Serdes, StreamsBuilder}
> >>> import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
> >>> import scala.util.{Random, Try}
> >>>
> >>> object MissingPartitionExample extends App {
> >>>
> >>>   implicit val stringSerde: Serde[String] = Serdes.String
> >>>   implicit val integerSerde: Serde[Integer] = Serdes.JavaInteger
> >>>   implicit val intSerde: Serde[Int] = Serdes.Integer
> >>>
> >>>   val config: Properties = {
> >>>     val p = new Properties()
> >>>     p.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "wordcount-scala-application")
> >>>     val bootstrapServers = if (args.length > 0) args(0) else
> "localhost:9092"
> >>>     p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
> >>>     p
> >>>   }
> >>>
> >>>   val builder = new StreamsBuilder()
> >>>   val input: KStream[String, String] = builder.stream[String,
> String]("input_numbers_topic")(Consumed.`with`(Serdes.String,
> Serdes.String))
> >>>   val branched = input
> >>>     .peek((k, v) => println(s"input: $k -> $v"))
> >>>     .flatMapValues(str => Try(str.toInt).toOption)
> >>>     .branch(
> >>>       (_, v) => v % 2 == 0,
> >>>       (_, v) => v % 2 != 0
> >>>     )
> >>>
> >>>   val r = Random.nextInt(2)
> >>>
> >>>   val i1 = branched(0)
> >>>     .map((k, v) => s"$r" -> v)
> >>>     .peek((k, v) => println(s"even: $k -> $v"))
> >>>     .groupByKey(Grouped.`with`)
> >>>     .windowedBy(TimeWindows.of(Duration.of(1,
> ChronoUnit.MINUTES)).grace(Duration.of(1, ChronoUnit.MINUTES)))
> >>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int,
> ByteArrayWindowStore]("even_store"))
> >>>     .toStream((windowedKey, _) => windowedKey.key())
> >>>     .peek((k, v) => println(s"even toStream: $k -> $v"))
> >>>     .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer))
> >>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int,
> ByteArrayKeyValueStore]("even_store_2")(Serdes.String, Serdes.Integer))
> >>>
> >>>   val i2 = branched(1)
> >>>     .map((k, v) => s"$r" -> v)
> >>>     .peek((k, v) => println(s"odd: $k -> $v"))
> >>>     .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer))
> >>>     .windowedBy(TimeWindows.of(Duration.of(1,
> ChronoUnit.MINUTES)).grace(Duration.of(1, ChronoUnit.MINUTES)))
> >>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int,
> ByteArrayWindowStore]("odd_store")(Serdes.String, Serdes.Integer))
> >>>     .toStream((windowedKey, _) => windowedKey.key())
> >>>     .peek((k, v) => println(s"even toStream: $k -> $v"))
> >>>     .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer))
> >>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int,
> ByteArrayKeyValueStore]("odd_store_2")(Serdes.String, Serdes.Integer))
> >>>
> >>>     i1.outerJoin(i2)((v1, v2) => v1 + v2)
> >>>       .toStream
> >>>       .peek((k, v) => println(s"join: $k -> $v"))
> >>>
> >>>   val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
> >>>   streams.start()
> >>>
> >>>   // Add shutdown hook to respond to SIGTERM and gracefully close
> Kafka Streams
> >>>   sys.ShutdownHookThread {
> >>>     val _ = streams.close(Duration.ofSeconds(10))
> >>>   }
> >>> }
> >>>
> >>> When trying to run it with 2.4.0 we get:
> >>
> >>> 12:39:57,443 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
> >>> Could NOT find resource [logback.groovy]
> >>
> >> 12:39:57,443 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
> >> Found resource [logback-test.xml] at
> >>
> [file:/Users/nitaykufert/Supersonic/sonic/app/services/bos/service/target/scala-2.11/classes/logback-test.xml]
> >> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] -
> >> Resource [logback-test.xml] occurs multiple times on the classpath.
> >> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] -
> >> Resource [logback-test.xml] occurs at
> >>
> [file:/Users/nitaykufert/Supersonic/sonic/app/services/bos/service/target/scala-2.11/classes/logback-test.xml]
> >> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] -
> >> Resource [logback-test.xml] occurs at
> >>
> [file:/Users/nitaykufert/Supersonic/sonic/app/utils/consumers-common/target/scala-2.11/classes/logback-test.xml]
> >> 12:39:57,518 |-INFO in
> >> ch.qos.logback.classic.joran.action.ConfigurationAction - debug
> attribute
> >> not set
> >> 12:39:57,525 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
> >> About to instantiate appender of type
> [ch.qos.logback.core.ConsoleAppender]
> >> 12:39:57,533 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
> >> Naming appender as [STDOUT]
> >> 12:39:57,539 |-INFO in
> >> ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming
> default
> >> type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder]
> >> property
> >> 12:39:57,589 |-INFO in
> >> ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of
> >> ROOT logger to ERROR
> >> 12:39:57,589 |-INFO in
> ch.qos.logback.core.joran.action.AppenderRefAction
> >> - Attaching appender named [STDOUT] to Logger[ROOT]
> >> 12:39:57,591 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction
> -
> >> Setting level of logger [OnlineBudgetConsumer] to DEBUG
> >> 12:39:57,591 |-INFO in
> >> ch.qos.logback.classic.joran.action.ConfigurationAction - End of
> >> configuration.
> >> 12:39:57,591 |-INFO in
> >> ch.qos.logback.classic.joran.JoranConfigurator@4fb64261 - Registering
> >> current configuration as safe fallback point
> >> 12:39:58.523 ERROR ?#?:?
> >>  - stream-thread
> >>
> [wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1-StreamThread-1]
> >> Encountered the following error during processing:
> >> java.lang.IllegalArgumentException: Number of partitions must be at
> least
> >> 1.
> >> at
> >>
> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> >> 12:39:58.541 ERROR ?#?:?
> >>  - stream-client
> >> [wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1] All
> >> stream threads have died. The instance will be in error state and
> should be
> >> closed.
> >> Exception in thread
> >>
> "wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1-StreamThread-1"
> >> java.lang.IllegalArgumentException: Number of partitions must be at
> least 1.
> >> at
> >>
> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> >> at
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> >> Process finished with exit code 0
> >>
> >>
> >> On Wed, Dec 25, 2019 at 12:56 PM Nitay Kufert <nita...@ironsrc.com>
> wrote:
> >>
> >>> Debugged a little, and I think the problem occurs when using
> *outerJoin *and
> >>> it always the repartition topic of the 2nd ktable of the join.
> >>>
> >>> I also didn't mentioned we are working with Scala 2.11.12
> >>>
> >>> On Tue, Dec 24, 2019 at 6:15 PM Nitay Kufert <nita...@ironsrc.com>
> wrote:
> >>>
> >>>> Hey, thanks for the response!
> >>>>
> >>>> Clarifying the feature flag:
> >>>> We have a flag that controls how our streams application is built
> >>>> (meaning, the topology is changing according to this flag).
> >>>> To be fair, the flag is set to TRUE for the last couple of months - so
> >>>> the fact that the streams application works without it is just to
> provide
> >>>> some indication where the problem may be.
> >>>>
> >>>> I don't think it has anything to do with the migration guide you
> >>>> supplied since I am testing it on my own local ENV and I can have a
> fresh
> >>>> start each test (no need to migrate nothing - just as if its the
> first time
> >>>> I even run the app).
> >>>>
> >>>> Regarding the topology - I can't get to a point where I can print the
> >>>> topology with 2.4 as I can't get the application to start.
> >>>> The topology of the working streams app (kafka 2.3.1) can be found
> here:
> >>>> https://gist.github.com/nitayk/8c4f70f42e827686e7ad87e4e9f34074
> >>>> Kind of complex, but the error is thrown over the following topic:
> >>>> *cdr_i_store_is_final-repartition*
> >>>> and generally speaking, when I am closing the feature flag,
> *sub-topologies
> >>>> 5,6,7 are gone *and everything seems to work fine (here is the
> topology
> >>>> when the toggle is OFF):
> >>>> https://gist.github.com/nitayk/2460cd6ac5d8122a31cee90991bc03a8
> >>>>
> >>>> I didn't dig deep enough in the source code as I can't find the time
> to
> >>>> do it
> >>>> But its pretty clear there is some unexpected behavior as I can make
> it
> >>>> work by just "removing" some code
> >>>>
> >>>> Any ideas?
> >>>>
> >>>> On Tue, Dec 24, 2019 at 1:06 AM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Also please note the upgrade guide
> >>>>> https://kafka.apache.org/documentation/streams/upgrade-guide that
> when
> >>>>> you
> >>>>> upgrade from lower version to 2.4 you'd need to set the upgrade.from
> >>>>> config, AND if you use optimization there's a small change you'd
> need to
> >>>>> make as well.
> >>>>>
> >>>>> On Mon, Dec 23, 2019 at 3:03 PM Guozhang Wang <wangg...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hello Nitay,
> >>>>>>
> >>>>>> Could you share the topology description on both 2.4 and 2.3.1, and
> >>>>> also
> >>>>>> could you elaborate on the feature flag you turned on / off?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Mon, Dec 23, 2019 at 9:32 AM Nitay Kufert <nita...@ironsrc.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> So as the title says, I am trying to upgrade our streams client to
> >>>>> the new
> >>>>>>> version (2.4) - and when trying to run our app with the new version
> >>>>> on my
> >>>>>>> local machine (and our test env) i keep getting the following
> error:
> >>>>>>>
> >>>>>>>> java.lang.IllegalArgumentException: Number of partitions must be
> at
> >>>>>>> least
> >>>>>>>> 1.
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> >>>>>>>> at
> >>>>>>>>
> >>>>>>>
> >>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> >>>>>>>>
> >>>>>>>
> >>>>>>> The topic which this error report is a repartition topic of one of
> >>>>> our
> >>>>>>> state operation - meaning this is an internal topic which should be
> >>>>>>> created
> >>>>>>> by the app with the same number of partitions as the input topic.
> >>>>>>>
> >>>>>>> When running the app with 2.4 version, this topic doesn't even get
> >>>>>>> created.
> >>>>>>> when trying to run the app with the previous version (2.3.1) -
> which
> >>>>>>> creates the topics, then switching back to 2.4 - I still get the
> same
> >>>>>>> error.
> >>>>>>>
> >>>>>>> Following this error, it seems that this internal topic doesn't
> have
> >>>>>>> "metadata" regarding the number of partitions (
> >>>>>>>
> >>>>>>>
> >>>>>
> repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()-
> >>>>>>> returns false).
> >>>>>>>
> >>>>>>> This specific stream operation has a feature flag - so if I turn
> the
> >>>>>>> feature flag OFF everything seems to work.
> >>>>>>>
> >>>>>>> Am I missing something? Maybe a new feature requires some new
> >>>>>>> configurations?
> >>>>>>> Any known bug?
> >>>>>>>
> >>>>>>> Help would be much appreciated
> >>>>>>>
> >>>>>>> Let me know if you need more information (I can try and describe
> our
> >>>>>>> topology - pretty complex)
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>> Nitay Kufert
> >>>>>>> Backend Team Leader
> >>>>>>> [image: ironSource] <http://www.ironsrc.com>
> >>>>>>>
> >>>>>>> email nita...@ironsrc.com
> >>>>>>> mobile +972-54-5480021
> >>>>>>> fax +972-77-5448273
> >>>>>>> skype nitay.kufert.ssa
> >>>>>>> 121 Menachem Begin St., Tel Aviv, Israel
> >>>>>>> ironsrc.com <http://www.ironsrc.com>
> >>>>>>> [image: linkedin] <https://www.linkedin.com/company/ironsource>
> >>>>> [image:
> >>>>>>> twitter] <https://twitter.com/ironsource> [image: facebook]
> >>>>>>> <https://www.facebook.com/ironSource> [image: googleplus]
> >>>>>>> <https://plus.google.com/+ironsrc>
> >>>>>>> This email (including any attachments) is for the sole use of the
> >>>>> intended
> >>>>>>> recipient and may contain confidential information which may be
> >>>>> protected
> >>>>>>> by legal privilege. If you are not the intended recipient, or the
> >>>>> employee
> >>>>>>> or agent responsible for delivering it to the intended recipient,
> >>>>> you are
> >>>>>>> hereby notified that any use, dissemination, distribution or
> copying
> >>>>> of
> >>>>>>> this communication and/or its content is strictly prohibited. If
> you
> >>>>> are
> >>>>>>> not the intended recipient, please immediately notify us by reply
> >>>>> email or
> >>>>>>> by telephone, delete this email and destroy any copies. Thank you.
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>> Nitay Kufert
> >>>> Backend Team Leader
> >>>> [image: ironSource] <http://www.ironsrc.com>
> >>>>
> >>>> email nita...@ironsrc.com
> >>>> mobile +972-54-5480021
> >>>> fax +972-77-5448273
> >>>> skype nitay.kufert.ssa
> >>>> 121 Menachem Begin St., Tel Aviv, Israel
> >>>> ironsrc.com <http://www.ironsrc.com>
> >>>> [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> >>>> twitter] <https://twitter.com/ironsource> [image: facebook]
> >>>> <https://www.facebook.com/ironSource> [image: googleplus]
> >>>> <https://plus.google.com/+ironsrc>
> >>>> This email (including any attachments) is for the sole use of the
> >>>> intended recipient and may contain confidential information which may
> be
> >>>> protected by legal privilege. If you are not the intended recipient,
> or the
> >>>> employee or agent responsible for delivering it to the intended
> recipient,
> >>>> you are hereby notified that any use, dissemination, distribution or
> >>>> copying of this communication and/or its content is strictly
> prohibited. If
> >>>> you are not the intended recipient, please immediately notify us by
> reply
> >>>> email or by telephone, delete this email and destroy any copies.
> Thank you.
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Nitay Kufert
> >>> Backend Team Leader
> >>> [image: ironSource] <http://www.ironsrc.com>
> >>>
> >>> email nita...@ironsrc.com
> >>> mobile +972-54-5480021
> >>> fax +972-77-5448273
> >>> skype nitay.kufert.ssa
> >>> 121 Menachem Begin St., Tel Aviv, Israel
> >>> ironsrc.com <http://www.ironsrc.com>
> >>> [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> >>> twitter] <https://twitter.com/ironsource> [image: facebook]
> >>> <https://www.facebook.com/ironSource> [image: googleplus]
> >>> <https://plus.google.com/+ironsrc>
> >>> This email (including any attachments) is for the sole use of the
> >>> intended recipient and may contain confidential information which may
> be
> >>> protected by legal privilege. If you are not the intended recipient,
> or the
> >>> employee or agent responsible for delivering it to the intended
> recipient,
> >>> you are hereby notified that any use, dissemination, distribution or
> >>> copying of this communication and/or its content is strictly
> prohibited. If
> >>> you are not the intended recipient, please immediately notify us by
> reply
> >>> email or by telephone, delete this email and destroy any copies. Thank
> you.
> >>>
> >>
> >>
> >> --
> >>
> >> Nitay Kufert
> >> Backend Team Leader
> >> [image: ironSource] <http://www.ironsrc.com>
> >>
> >> email nita...@ironsrc.com
> >> mobile +972-54-5480021
> >> fax +972-77-5448273
> >> skype nitay.kufert.ssa
> >> 121 Menachem Begin St., Tel Aviv, Israel
> >> ironsrc.com <http://www.ironsrc.com>
> >> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> >> twitter] <https://twitter.com/ironsource> [image: facebook]
> >> <https://www.facebook.com/ironSource> [image: googleplus]
> >> <https://plus.google.com/+ironsrc>
> >> This email (including any attachments) is for the sole use of the
> intended
> >> recipient and may contain confidential information which may be
> protected
> >> by legal privilege. If you are not the intended recipient, or the
> employee
> >> or agent responsible for delivering it to the intended recipient, you
> are
> >> hereby notified that any use, dissemination, distribution or copying of
> >> this communication and/or its content is strictly prohibited. If you are
> >> not the intended recipient, please immediately notify us by reply email
> or
> >> by telephone, delete this email and destroy any copies. Thank you.
> >>
> >
> >
>
>

-- 
-- Guozhang

Reply via email to