Hi,

Thanks for your remarks,

MJS1:
Indeed, I started looking into a possible update for Tasks and GlobalTasks, and 
it seems this introduces a lot of complexity.

I initially considered simply forbidding the creation of input/output topics 
after records have been sent; however, introducing a dedicated setup phase 
seems cleaner and easier to implement.

One concern is backward compatibility with existing tests. We could make 
driver.init() mandatory only if at least one topic has more than one partition, 
keeping single-partition tests unchanged.

MJS2:
Good catch — your solution reduces the number of pipeInput and readRecord 
overloads. We just need to add a new partition property to TestRecord.

To summarize, a test could look like this:

// Setup and initialization
TopologyTestDriver driver = new TopologyTestDriver(...);
driver.createInputTopic("topic1", 2);
driver.createOutputTopic("topic2", 2);

// Topics exist but cannot pipe records yet
driver.init(); // tasks and global tasks are created

// Partition 0 and 1 are valid
TestRecord<String, String> record0 = new TestRecord<>("key0", "value0", 1000L, 
0);
TestRecord<String, String> record1 = new TestRecord<>("key1", "value1", 1000L, 
1);

inputTopic.pipeInput(record0);
inputTopic.pipeInput(record1);

// Invalid partition → exception
TestRecord<String, String> record2 = new TestRecord<>("key2", "value2", 1000L, 
2);
inputTopic.pipeInput(record2); // throws IllegalArgumentException

// Reading output
List<TestRecord<String, String>> outputRecords = 
outputTopic.readRecordsToList();

// Filter by partition if needed
List<TestRecord<String, String>> partition0Records = outputRecords.stream()
.filter(r -> r.getPartition() == 0)
.collect(Collectors.toList());

// Optional helper: read directly from a specific partition
TestOutputTopic<String, String> partition1Topic = outputTopic.partition(1);
List<TestRecord<String, String>> partition1Records = 
partition1Topic.readRecordsToList();


We will for other comments, if any, before updating the KIP.

regards,
Marie Laure, Adam, Julien and Sébastien


De : Matthias J. Sax <[email protected]>
Date : dimanche, 22 février 2026 à 23:56
À : [email protected] <[email protected]>
Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for TopologyTestDriver in 
Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hello everyone,

thanks for this KIP. It's indeed a long standing feature request to
support multi-partition TDD, so I am very happy to see this KIP (sorry
for ignoring it for way too long...)


Couple of question:


MJS1: The KIP states:

> Each time a input topic is created with a partition number higher than the 
> max partition number already defined, the Tasks and GlobalTasks will be 
> updated.

This sounds quite complex? Atm, because TTD is single-partitioned, one
can call `createInputTopic()` and `createOutputTopic()` at any time, and
pipe record through the topology interleaved. Because there is only a
single partition/task, it does not make things complicated.

However, with multiple partitions and task, I am worried that we might
introduce unnecessary complexity (potentially in both the TDD impl, as
well as for the user). Could it make sense to add a dedicated setup
phase? Ie, all input/output topic must be created before we allow to
pipe record? The control flow would be something like:

> TopologyTestDriver driver = new TopologyTestDriver(...);
> driver.createInputTopic(...)
> driver.createOutputTopic(...)
>
> // created TestInputTopics / TestOutputTopics cannot be used yet
>
> driver.init(); // sets up the driver and creates tasks
>
> // now TestInputTopic / TestOutputTopic can be used



MJS2: I am somewhat concerned about the number of overloads we are
adding to `TestInputTopic` and `TestOutputTopic`.

Would it be simpler to instead modify `TestRecord` and allow to set the
partition there, to avoid all the overloads to `pipeInput`? I would
assume that most tests would rely on `DefaultPartitioner` anyway, and it
seems ok to force the usage of `TestRecord` if a partitions must be set
explicitly?

For `TestOutputTopic` the partition number of each record would be set
on the `TestRecord` again (for `readRecord() and `readRecordsToList()`.
We could also introduce `TestOutputTopicPartition` and add
`TestOutputTopic.partition(int)` to allow reading data from a specific
partition?



-Matthias

This email was screened for spam and malicious content but exercise caution 
anyway.





On 11/12/25 11:48 PM, Sebastien Viale wrote:
> Hi Everyone,
>
> I would like to start a discussion on KIP-1238: Multipartition for 
> TopologyTestDriver in Kafka Streams
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1238%3A+Multipartition+for+TopologyTestDriver+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1238%3A+Multipartition+for+TopologyTestDriver+in+Kafka+Streams>
>
> This KIP proposes to introduce multi-partition support in the 
> TopologyTestDriver, enabling more accurate and convenient stream testing 
> while improving automated unit test coverage.
>
>
> Regards,
>
>
> Julien & Adam & Marie-Laure & Sébastien
>
>

Reply via email to