Thanks Nitay! We will investigate!

-Matthias

On 12/26/19 2:53 AM, Nitay Kufert wrote:
> Also posted it at apache jira:
> https://issues.apache.org/jira/browse/KAFKA-9335
> 
> On Thu, Dec 26, 2019 at 12:41 PM Nitay Kufert <nita...@ironsrc.com> wrote:
> 
>> I have made a "toy" example to reproduce this error, this is more or less
>> what's going on in our application:
>>>
>>> package com.supersonic.bos.consumer
>>>
>>> import java.time.Duration
>>> import java.time.temporal.ChronoUnit
>>> import java.util.Properties
>>> import org.apache.kafka.common.serialization.Serde
>>> import org.apache.kafka.streams.kstream.TimeWindows
>>> import org.apache.kafka.streams.scala.kstream._
>>> import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, 
>>> ByteArrayWindowStore, Serdes, StreamsBuilder}
>>> import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
>>> import scala.util.{Random, Try}
>>>
>>> object MissingPartitionExample extends App {
>>>
>>>   implicit val stringSerde: Serde[String] = Serdes.String
>>>   implicit val integerSerde: Serde[Integer] = Serdes.JavaInteger
>>>   implicit val intSerde: Serde[Int] = Serdes.Integer
>>>
>>>   val config: Properties = {
>>>     val p = new Properties()
>>>     p.put(StreamsConfig.APPLICATION_ID_CONFIG, 
>>> "wordcount-scala-application")
>>>     val bootstrapServers = if (args.length > 0) args(0) else 
>>> "localhost:9092"
>>>     p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
>>>     p
>>>   }
>>>
>>>   val builder = new StreamsBuilder()
>>>   val input: KStream[String, String] = builder.stream[String, 
>>> String]("input_numbers_topic")(Consumed.`with`(Serdes.String, 
>>> Serdes.String))
>>>   val branched = input
>>>     .peek((k, v) => println(s"input: $k -> $v"))
>>>     .flatMapValues(str => Try(str.toInt).toOption)
>>>     .branch(
>>>       (_, v) => v % 2 == 0,
>>>       (_, v) => v % 2 != 0
>>>     )
>>>
>>>   val r = Random.nextInt(2)
>>>
>>>   val i1 = branched(0)
>>>     .map((k, v) => s"$r" -> v)
>>>     .peek((k, v) => println(s"even: $k -> $v"))
>>>     .groupByKey(Grouped.`with`)
>>>     .windowedBy(TimeWindows.of(Duration.of(1, 
>>> ChronoUnit.MINUTES)).grace(Duration.of(1, ChronoUnit.MINUTES)))
>>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, 
>>> ByteArrayWindowStore]("even_store"))
>>>     .toStream((windowedKey, _) => windowedKey.key())
>>>     .peek((k, v) => println(s"even toStream: $k -> $v"))
>>>     .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer))
>>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, 
>>> ByteArrayKeyValueStore]("even_store_2")(Serdes.String, Serdes.Integer))
>>>
>>>   val i2 = branched(1)
>>>     .map((k, v) => s"$r" -> v)
>>>     .peek((k, v) => println(s"odd: $k -> $v"))
>>>     .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer))
>>>     .windowedBy(TimeWindows.of(Duration.of(1, 
>>> ChronoUnit.MINUTES)).grace(Duration.of(1, ChronoUnit.MINUTES)))
>>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, 
>>> ByteArrayWindowStore]("odd_store")(Serdes.String, Serdes.Integer))
>>>     .toStream((windowedKey, _) => windowedKey.key())
>>>     .peek((k, v) => println(s"even toStream: $k -> $v"))
>>>     .groupByKey(Grouped.`with`(Serdes.String, Serdes.Integer))
>>>     .reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, 
>>> ByteArrayKeyValueStore]("odd_store_2")(Serdes.String, Serdes.Integer))
>>>
>>>     i1.outerJoin(i2)((v1, v2) => v1 + v2)
>>>       .toStream
>>>       .peek((k, v) => println(s"join: $k -> $v"))
>>>
>>>   val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
>>>   streams.start()
>>>
>>>   // Add shutdown hook to respond to SIGTERM and gracefully close Kafka 
>>> Streams
>>>   sys.ShutdownHookThread {
>>>     val _ = streams.close(Duration.ofSeconds(10))
>>>   }
>>> }
>>>
>>> When trying to run it with 2.4.0 we get:
>>
>>> 12:39:57,443 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
>>> Could NOT find resource [logback.groovy]
>>
>> 12:39:57,443 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
>> Found resource [logback-test.xml] at
>> [file:/Users/nitaykufert/Supersonic/sonic/app/services/bos/service/target/scala-2.11/classes/logback-test.xml]
>> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] -
>> Resource [logback-test.xml] occurs multiple times on the classpath.
>> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] -
>> Resource [logback-test.xml] occurs at
>> [file:/Users/nitaykufert/Supersonic/sonic/app/services/bos/service/target/scala-2.11/classes/logback-test.xml]
>> 12:39:57,444 |-WARN in ch.qos.logback.classic.LoggerContext[default] -
>> Resource [logback-test.xml] occurs at
>> [file:/Users/nitaykufert/Supersonic/sonic/app/utils/consumers-common/target/scala-2.11/classes/logback-test.xml]
>> 12:39:57,518 |-INFO in
>> ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute
>> not set
>> 12:39:57,525 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
>> About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
>> 12:39:57,533 |-INFO in ch.qos.logback.core.joran.action.AppenderAction -
>> Naming appender as [STDOUT]
>> 12:39:57,539 |-INFO in
>> ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default
>> type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder]
>> property
>> 12:39:57,589 |-INFO in
>> ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of
>> ROOT logger to ERROR
>> 12:39:57,589 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction
>> - Attaching appender named [STDOUT] to Logger[ROOT]
>> 12:39:57,591 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction -
>> Setting level of logger [OnlineBudgetConsumer] to DEBUG
>> 12:39:57,591 |-INFO in
>> ch.qos.logback.classic.joran.action.ConfigurationAction - End of
>> configuration.
>> 12:39:57,591 |-INFO in
>> ch.qos.logback.classic.joran.JoranConfigurator@4fb64261 - Registering
>> current configuration as safe fallback point
>> 12:39:58.523 ERROR ?#?:?
>>  - stream-thread
>> [wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1-StreamThread-1]
>> Encountered the following error during processing:
>> java.lang.IllegalArgumentException: Number of partitions must be at least
>> 1.
>> at
>> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62)
>> at
>> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>> at
>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>> at
>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>> at
>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>> 12:39:58.541 ERROR ?#?:?
>>  - stream-client
>> [wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1] All
>> stream threads have died. The instance will be in error state and should be
>> closed.
>> Exception in thread
>> "wordcount-scala-application-30c026af-69f5-4e56-9479-b6bd764990a1-StreamThread-1"
>> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
>> at
>> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62)
>> at
>> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>> at
>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>> at
>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>> at
>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>> Process finished with exit code 0
>>
>>
>> On Wed, Dec 25, 2019 at 12:56 PM Nitay Kufert <nita...@ironsrc.com> wrote:
>>
>>> Debugged a little, and I think the problem occurs when using *outerJoin *and
>>> it always the repartition topic of the 2nd ktable of the join.
>>>
>>> I also didn't mentioned we are working with Scala 2.11.12
>>>
>>> On Tue, Dec 24, 2019 at 6:15 PM Nitay Kufert <nita...@ironsrc.com> wrote:
>>>
>>>> Hey, thanks for the response!
>>>>
>>>> Clarifying the feature flag:
>>>> We have a flag that controls how our streams application is built
>>>> (meaning, the topology is changing according to this flag).
>>>> To be fair, the flag is set to TRUE for the last couple of months - so
>>>> the fact that the streams application works without it is just to provide
>>>> some indication where the problem may be.
>>>>
>>>> I don't think it has anything to do with the migration guide you
>>>> supplied since I am testing it on my own local ENV and I can have a fresh
>>>> start each test (no need to migrate nothing - just as if its the first time
>>>> I even run the app).
>>>>
>>>> Regarding the topology - I can't get to a point where I can print the
>>>> topology with 2.4 as I can't get the application to start.
>>>> The topology of the working streams app (kafka 2.3.1) can be found here:
>>>> https://gist.github.com/nitayk/8c4f70f42e827686e7ad87e4e9f34074
>>>> Kind of complex, but the error is thrown over the following topic:
>>>> *cdr_i_store_is_final-repartition*
>>>> and generally speaking, when I am closing the feature flag, *sub-topologies
>>>> 5,6,7 are gone *and everything seems to work fine (here is the topology
>>>> when the toggle is OFF):
>>>> https://gist.github.com/nitayk/2460cd6ac5d8122a31cee90991bc03a8
>>>>
>>>> I didn't dig deep enough in the source code as I can't find the time to
>>>> do it
>>>> But its pretty clear there is some unexpected behavior as I can make it
>>>> work by just "removing" some code
>>>>
>>>> Any ideas?
>>>>
>>>> On Tue, Dec 24, 2019 at 1:06 AM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>
>>>>> Also please note the upgrade guide
>>>>> https://kafka.apache.org/documentation/streams/upgrade-guide that when
>>>>> you
>>>>> upgrade from lower version to 2.4 you'd need to set the upgrade.from
>>>>> config, AND if you use optimization there's a small change you'd need to
>>>>> make as well.
>>>>>
>>>>> On Mon, Dec 23, 2019 at 3:03 PM Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Nitay,
>>>>>>
>>>>>> Could you share the topology description on both 2.4 and 2.3.1, and
>>>>> also
>>>>>> could you elaborate on the feature flag you turned on / off?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Mon, Dec 23, 2019 at 9:32 AM Nitay Kufert <nita...@ironsrc.com>
>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> So as the title says, I am trying to upgrade our streams client to
>>>>> the new
>>>>>>> version (2.4) - and when trying to run our app with the new version
>>>>> on my
>>>>>>> local machine (and our test env) i keep getting the following error:
>>>>>>>
>>>>>>>> java.lang.IllegalArgumentException: Number of partitions must be at
>>>>>>> least
>>>>>>>> 1.
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>>>>>>>>
>>>>>>>
>>>>>>> The topic which this error report is a repartition topic of one of
>>>>> our
>>>>>>> state operation - meaning this is an internal topic which should be
>>>>>>> created
>>>>>>> by the app with the same number of partitions as the input topic.
>>>>>>>
>>>>>>> When running the app with 2.4 version, this topic doesn't even get
>>>>>>> created.
>>>>>>> when trying to run the app with the previous version (2.3.1) - which
>>>>>>> creates the topics, then switching back to 2.4 - I still get the same
>>>>>>> error.
>>>>>>>
>>>>>>> Following this error, it seems that this internal topic doesn't have
>>>>>>> "metadata" regarding the number of partitions (
>>>>>>>
>>>>>>>
>>>>> repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()-
>>>>>>> returns false).
>>>>>>>
>>>>>>> This specific stream operation has a feature flag - so if I turn the
>>>>>>> feature flag OFF everything seems to work.
>>>>>>>
>>>>>>> Am I missing something? Maybe a new feature requires some new
>>>>>>> configurations?
>>>>>>> Any known bug?
>>>>>>>
>>>>>>> Help would be much appreciated
>>>>>>>
>>>>>>> Let me know if you need more information (I can try and describe our
>>>>>>> topology - pretty complex)
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Nitay Kufert
>>>>>>> Backend Team Leader
>>>>>>> [image: ironSource] <http://www.ironsrc.com>
>>>>>>>
>>>>>>> email nita...@ironsrc.com
>>>>>>> mobile +972-54-5480021
>>>>>>> fax +972-77-5448273
>>>>>>> skype nitay.kufert.ssa
>>>>>>> 121 Menachem Begin St., Tel Aviv, Israel
>>>>>>> ironsrc.com <http://www.ironsrc.com>
>>>>>>> [image: linkedin] <https://www.linkedin.com/company/ironsource>
>>>>> [image:
>>>>>>> twitter] <https://twitter.com/ironsource> [image: facebook]
>>>>>>> <https://www.facebook.com/ironSource> [image: googleplus]
>>>>>>> <https://plus.google.com/+ironsrc>
>>>>>>> This email (including any attachments) is for the sole use of the
>>>>> intended
>>>>>>> recipient and may contain confidential information which may be
>>>>> protected
>>>>>>> by legal privilege. If you are not the intended recipient, or the
>>>>> employee
>>>>>>> or agent responsible for delivering it to the intended recipient,
>>>>> you are
>>>>>>> hereby notified that any use, dissemination, distribution or copying
>>>>> of
>>>>>>> this communication and/or its content is strictly prohibited. If you
>>>>> are
>>>>>>> not the intended recipient, please immediately notify us by reply
>>>>> email or
>>>>>>> by telephone, delete this email and destroy any copies. Thank you.
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Nitay Kufert
>>>> Backend Team Leader
>>>> [image: ironSource] <http://www.ironsrc.com>
>>>>
>>>> email nita...@ironsrc.com
>>>> mobile +972-54-5480021
>>>> fax +972-77-5448273
>>>> skype nitay.kufert.ssa
>>>> 121 Menachem Begin St., Tel Aviv, Israel
>>>> ironsrc.com <http://www.ironsrc.com>
>>>> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
>>>> twitter] <https://twitter.com/ironsource> [image: facebook]
>>>> <https://www.facebook.com/ironSource> [image: googleplus]
>>>> <https://plus.google.com/+ironsrc>
>>>> This email (including any attachments) is for the sole use of the
>>>> intended recipient and may contain confidential information which may be
>>>> protected by legal privilege. If you are not the intended recipient, or the
>>>> employee or agent responsible for delivering it to the intended recipient,
>>>> you are hereby notified that any use, dissemination, distribution or
>>>> copying of this communication and/or its content is strictly prohibited. If
>>>> you are not the intended recipient, please immediately notify us by reply
>>>> email or by telephone, delete this email and destroy any copies. Thank you.
>>>>
>>>
>>>
>>> --
>>>
>>> Nitay Kufert
>>> Backend Team Leader
>>> [image: ironSource] <http://www.ironsrc.com>
>>>
>>> email nita...@ironsrc.com
>>> mobile +972-54-5480021
>>> fax +972-77-5448273
>>> skype nitay.kufert.ssa
>>> 121 Menachem Begin St., Tel Aviv, Israel
>>> ironsrc.com <http://www.ironsrc.com>
>>> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
>>> twitter] <https://twitter.com/ironsource> [image: facebook]
>>> <https://www.facebook.com/ironSource> [image: googleplus]
>>> <https://plus.google.com/+ironsrc>
>>> This email (including any attachments) is for the sole use of the
>>> intended recipient and may contain confidential information which may be
>>> protected by legal privilege. If you are not the intended recipient, or the
>>> employee or agent responsible for delivering it to the intended recipient,
>>> you are hereby notified that any use, dissemination, distribution or
>>> copying of this communication and/or its content is strictly prohibited. If
>>> you are not the intended recipient, please immediately notify us by reply
>>> email or by telephone, delete this email and destroy any copies. Thank you.
>>>
>>
>>
>> --
>>
>> Nitay Kufert
>> Backend Team Leader
>> [image: ironSource] <http://www.ironsrc.com>
>>
>> email nita...@ironsrc.com
>> mobile +972-54-5480021
>> fax +972-77-5448273
>> skype nitay.kufert.ssa
>> 121 Menachem Begin St., Tel Aviv, Israel
>> ironsrc.com <http://www.ironsrc.com>
>> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
>> twitter] <https://twitter.com/ironsource> [image: facebook]
>> <https://www.facebook.com/ironSource> [image: googleplus]
>> <https://plus.google.com/+ironsrc>
>> This email (including any attachments) is for the sole use of the intended
>> recipient and may contain confidential information which may be protected
>> by legal privilege. If you are not the intended recipient, or the employee
>> or agent responsible for delivering it to the intended recipient, you are
>> hereby notified that any use, dissemination, distribution or copying of
>> this communication and/or its content is strictly prohibited. If you are
>> not the intended recipient, please immediately notify us by reply email or
>> by telephone, delete this email and destroy any copies. Thank you.
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to