Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4761#discussion_r154026050 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent; +import org.apache.flink.runtime.iterative.event.TerminationEvent; +import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Basic tests for {@link TaskEventDispatcher}. + */ +public class TaskEventDispatcherTest extends TestLogger { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void registerPartitionTwice() throws Exception { + ResultPartitionID partitionId = new ResultPartitionID(); + TaskEventDispatcher ted = new TaskEventDispatcher(); + ted.registerPartition(partitionId); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("already registered at task event dispatcher"); + + ted.registerPartition(partitionId); + } + + @Test + public void subscribeToEventNotRegistered() throws Exception { + TaskEventDispatcher ted = new TaskEventDispatcher(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("not registered at task event dispatcher"); + + //noinspection unchecked + ted.subscribeToEvent(new ResultPartitionID(), mock(EventListener.class), TaskEvent.class); + } + + /** + * Tests {@link TaskEventDispatcher#publish(ResultPartitionID, TaskEvent)} and {@link TaskEventDispatcher#subscribeToEvent(ResultPartitionID, EventListener, Class)} methods. + */ + @Test + public void publishSubscribe() throws Exception { + ResultPartitionID partitionId1 = new ResultPartitionID(); + ResultPartitionID partitionId2 = new ResultPartitionID(); + TaskEventDispatcher ted = new TaskEventDispatcher(); + + AllWorkersDoneEvent event1 = new AllWorkersDoneEvent(); + assertFalse(ted.publish(partitionId1, event1)); + + ted.registerPartition(partitionId1); + ted.registerPartition(partitionId2); + + // no event listener subscribed yet, but the event is forwarded to a TaskEventHandler + assertTrue(ted.publish(partitionId1, event1)); + + //noinspection unchecked + EventListener<TaskEvent> eventListener1a = mock(EventListener.class); --- End diff -- ``` = new OneShotEventListener(event1); = new ZeroShotEventListener(); ... ```
---