[ https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532726#comment-14532726 ]
ASF GitHub Bot commented on FLINK-1977: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29856312 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java --- @@ -53,18 +47,22 @@ public void fromCollectionTest() { @Test public void socketTextStreamTest() throws Exception { - List<String> expectedList = Arrays.asList("a", "b", "c"); - List<String> actualList = new ArrayList<String>(); - - byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' }; - - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - new SocketTextStreamFunction("", 0, '\n', 0).streamFromSocket(new MockCollector<String>( - actualList), socket); - assertEquals(expectedList, actualList); + // TODO: does not work because we cannot set the internal socket anymore --- End diff -- This test never tested any actual networking functionality. I actually started a job with a socket source and it still works as it should. > Rework Stream Operators to always be push based > ----------------------------------------------- > > Key: FLINK-1977 > URL: https://issues.apache.org/jira/browse/FLINK-1977 > Project: Flink > Issue Type: Improvement > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > This is a result of the discussion on the mailing list. This is an excerpt > from the mailing list that gives the basic idea of the change: > I propose to change all streaming operators to be push based, with a > slightly improved interface: In addition to collect(), which I would > call receiveElement() I would add receivePunctuation() and > receiveBarrier(). The first operator in the chain would also get data > from the outside invokable that reads from the input iterator and > calls receiveElement() for the first operator in a chain. -- This message was sent by Atlassian JIRA (v6.3.4#6332)