Kin Siu created KAFKA-9244:
------------------------------
Summary: Update of old FK reference on RHS should not trigger join
result
Key: KAFKA-9244
URL: https://issues.apache.org/jira/browse/KAFKA-9244
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.4.0
Reporter: Kin Siu
Perform a KTable-KTable foreign key join, after changing LHS FK reference from
FK1 -> FK2, populating update on RHS with FK1 should not produce join result.
{code:java}
@Test
public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
final Topology topology = getTopology(streamsConfig, "store", leftJoin);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology,
streamsConfig)) {
final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
// Pre-populate the RHS records. This test is all about what
happens when we change LHS records foreign key reference
// then populate update on RHS
right.pipeInput("rhs1", "rhsValue1");
right.pipeInput("rhs2", "rhsValue2");
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
assertThat(
asMap(store),
is(emptyMap())
);
left.pipeInput("lhs1", "lhsValue1|rhs1");
{
final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
);
assertThat(
outputTopic.readKeyValuesToMap(),
is(expected)
);
assertThat(
asMap(store),
is(expected)
);
}
// Change LHS foreign key reference
left.pipeInput("lhs1", "lhsValue1|rhs2");
{
final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
);
assertThat(
outputTopic.readKeyValuesToMap(),
is(expected)
);
assertThat(
asMap(store),
is(expected)
);
}
// Populate RHS update on old LHS foreign key ref
right.pipeInput("rhs1", "rhsValue1Delta");
{
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
))
);
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)