Thanks for reporting this Hamid. The ProcessorTopologyTestDriver is used only in ProcessorTopologyTest and is currently not expected to use otherwise, and hence that is why we overwrite the MockProducer's partitionsFor function to only return the empty list. Is there any particular reason that you want to use ProcessorTopologyTestDriver in your testing / staging environment? In general I'd like to recommend using the a full-fledged Kafka Stream library for integration tests with synthetic testing data.
That said, in RecordCollector we can at least capture `numPartitions` = 0 and throw a more informative error. Will file a patch for this improvement. Guozhang On Wed, Sep 28, 2016 at 2:48 AM, Hamidreza Afzali < hamidreza.afz...@hivestreaming.com> wrote: > Hi, > > We are using the latest Kafka 0.10.1 branch. The combination of > ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a > division by 0 exception because of the empty list of partitions: > > https://github.com/apache/kafka/blob/0.10.1/streams/src/ > test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158 > https://github.com/apache/kafka/blob/0.10.1/streams/src/ > main/java/org/apache/kafka/streams/kstream/internals/ > WindowedStreamPartitioner.java#L47 > > Our topology looks similar to this: > > builder.stream("events") > .groupByKey(...) > .aggregate(..., > TimeWindows.of(1 * 60 * 1000L) > ) > .mapValues(_.size: Integer) > .to(windowedSerde, Serdes.Integer(), "events-over-time") > > If we use our own partitioner in .to() it works. > > class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] { > override def partition(k: K, v: V, numPartitions: Int): Integer = { > // return an integer between 0 and numPartitions-1, or null if the > default partitioning logic should be used > null > } > } > > Is this a bug? > > Thank you in advance, > Hamid > > -- -- Guozhang