RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1691113488
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java: ########## @@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest { private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; - private Set<SlotSharingGroup> slotSharingGroups; - - @BeforeEach - void setUp() { - topology = new TestingSchedulingTopology(); - + private void setupCase() { ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); - final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); - slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); - slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); - slotSharingGroups = Collections.singleton(slotSharingGroup); + slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); + slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); } - @Test - void testCoLocationConstraintIsRespected() { - topology.connect(ev11, ev22); - topology.connect(ev12, ev21); - - final CoLocationGroup coLocationGroup = - new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); - final Set<CoLocationGroup> coLocationGroups = Collections.singleton(coLocationGroup); - - final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, coLocationGroups); - - assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) - .contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) - .contains(ev12.getId(), ev22.getId()); + @Override + protected SlotSharingStrategy getSlotSharingStrategy( + SchedulingTopology topology, + Set<SlotSharingGroup> slotSharingGroups, + Set<CoLocationGroup> coLocationGroups) { + return new LocalInputPreferredSlotSharingStrategy( + topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { + setupCase(); Review Comment: Thank @1996fanrui you very much for the review , I updated related parts based on your comments. PTAL if you had the free time. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org